Criar um chatbot RAG com o GKE e o Cloud Storage


Neste tutorial, mostramos como integrar um aplicativo de modelo de linguagem grande (LLM, na sigla em inglês) baseado na geração aumentada de recuperação (RAG, na sigla em inglês) com arquivos PDF que você faz upload para um bucket do Cloud Storage.

Este guia usa um banco de dados como um mecanismo de armazenamento e de pesquisa semântica que armazena as representações (embeddings) dos documentos enviados. Você usa o framework Langchain para interagir com os embeddings e usa modelos do Gemini disponíveis na Vertex AI.

O Langchain é um framework Python de código aberto conhecido que simplifica muitas tarefas de machine learning e tem interfaces para integração com diferentes bancos de dados vetoriais e serviços de IA.

Este tutorial é destinado a administradores e arquitetos da plataforma de nuvem, engenheiros de ML e profissionais de MLOps (DevOps) interessados em implantar aplicativos RAG LLM no GKE e no Cloud Storage.

Objetivos

Neste tutorial, você aprenderá a realizar as seguintes tarefas:

  • Crie e implante um aplicativo para criar e armazenar embeddings de documentos em um banco de dados de vetores.
  • Automatize o aplicativo para acionar novos uploads de documentos para um bucket do Cloud Storage.
  • Implante um aplicativo de chatbot que usa a pesquisa semântica para responder a perguntas com base no conteúdo do documento.

Arquitetura de implantação

Neste tutorial, você vai criar um bucket do Cloud Storage, um gatilho do Eventarc e os seguintes serviços:

  • embed-docs: o Eventarc aciona esse serviço sempre que um usuário faz o upload de um novo documento para o bucket doo Cloud Storage. O serviço inicia um job do Kubernetes que cria embeddings para o documento enviado e os insere em um banco de dados de vetores.
  • chatbot: esse serviço responde a perguntas em linguagem natural sobre os documentos enviados pelo usuário usando a pesquisa semântica e a API Gemini.

O diagrama a seguir mostra o processo de upload e vetorização de documentos:

No diagrama, o usuário faz upload de arquivos no bucket do Cloud Storage. O Eventarc se inscreve em eventos metadataUpdated de objetos para o bucket e usa o encaminhador de eventos do Eventarc, que é uma carga de trabalho do Kubernetes, para chamar o serviço embed-docs quando você faz upload de um novo documento. O serviço cria embeddings para o documento enviado. O serviço embed-docs armazena os embeddings em um banco de dados de vetores usando o modelo de embedding da Vertex AI.

O diagrama a seguir mostra o processo de fazer perguntas sobre o conteúdo do documento enviado usando o serviço chatbot:

Os usuários podem fazer perguntas usando a linguagem natural. O chatbot gera respostas com base apenas no conteúdo dos arquivos enviados. O chatbot recupera o contexto do banco de dados de vetores usando a pesquisa semântica e, em seguida, envia a pergunta e o contexto para o Gemini.

Custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Ao concluir as tarefas descritas neste documento, é possível evitar o faturamento contínuo excluindo os recursos criados. Saiba mais em Limpeza.

Antes de começar

Neste tutorial, use o Cloud Shell para executar os comandos. O Cloud Shell é um ambiente shell para gerenciar recursos hospedados no Google Cloud. O Cloud Shell vem pré-instalado com a ferramentas de linha de comando CLI do Google Cloud, kubectl e Terraform. Se você não usa o Cloud Shell, instale a CLI do Google Cloud.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Vertex AI, Cloud Build, Eventarc, Artifact Registry APIs:

    gcloud services enable aiplatform.googleapis.com cloudbuild.googleapis.com eventarc.googleapis.com artifactregistry.googleapis.com
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Make sure that billing is enabled for your Google Cloud project.

  11. Enable the Vertex AI, Cloud Build, Eventarc, Artifact Registry APIs:

    gcloud services enable aiplatform.googleapis.com cloudbuild.googleapis.com eventarc.googleapis.com artifactregistry.googleapis.com
  12. Grant roles to your user account. Run the following command once for each of the following IAM roles: eventarc.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

Crie um cluster

Crie um cluster do Qdrant, do Elasticsearch ou do Postgres:

Qdrant

Siga as instruções em Implantar um banco de dados de vetores Qdrant no GKE para criar um cluster do Qdrant em execução no modo Autopilot ou no modo padrão do cluster do GKE.

Elasticsearch

Siga as instruções em Implantar um banco de dados de vetores do Elasticsearch no GKE para criar um cluster do Elasticsearch em execução em um cluster do GKE no modo Autopilot ou padrão.

PGVector

Siga as instruções em Implantar um banco de dados de vetores PostgreSQL no GKE para criar um cluster do Postgres com o PGVector em execução em um cluster do GKE no modo Autopilot ou Standard.

Weaviate

Siga as instruções para implantar um banco de dados de vetores do Weaviate no GKE e criar um cluster do Weaviate em execução em um cluster do GKE no modo Autopilot ou Padrão.

Configurar o ambiente

Configure o ambiente com o Cloud Shell:

  1. Defina as variáveis de ambiente do projeto:

    Qdrant

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=qdrant
    export REGION=us-central1
    export DB_NAMESPACE=qdrant
    

    Substitua PROJECT_ID pelo ID do projeto do Google Cloud.

    Elasticsearch

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=elasticsearch
    export REGION=us-central1
    export DB_NAMESPACE=elastic
    

    Substitua PROJECT_ID pelo ID do projeto do Google Cloud.

    PGVector

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=postgres
    export REGION=us-central1
    export DB_NAMESPACE=pg-ns
    

    Substitua PROJECT_ID pelo ID do projeto do Google Cloud.

    Weaviate

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=weaviate
    export REGION=us-central1
    export DB_NAMESPACE=weaviate
    

    Substitua PROJECT_ID pelo ID do projeto do Google Cloud.

  2. Verifique se o cluster do GKE está em execução:

    gcloud container clusters list --project=${PROJECT_ID} --region=${REGION}
    

    O resultado será assim:

    NAME                                    LOCATION        MASTER_VERSION      MASTER_IP     MACHINE_TYPE  NODE_VERSION        NUM_NODES STATUS
    [KUBERNETES_CLUSTER_PREFIX]-cluster   us-central1   1.30.1-gke.1329003  <EXTERNAL IP> e2-standard-2 1.30.1-gke.1329003   6        RUNNING
    
  3. Clone o repositório de exemplo de código do GitHub:

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  4. Navegue até o diretório databases:

    cd kubernetes-engine-samples/databases
    

Preparar sua infraestrutura

Crie um repositório do Artifact Registry, gere imagens do Docker e envie imagens Docker para o Artifact Registry:

  1. Crie um repositório do Artifact Registry:

    gcloud artifacts repositories create ${KUBERNETES_CLUSTER_PREFIX}-images \
        --repository-format=docker \
        --location=${REGION} \
        --description="Vector database images repository" \
        --async
    
  2. Defina as permissões storage.objectAdmin e artifactregistry.admin na conta de serviço do Compute Engine para usar o Cloud Build para criar e enviar imagens Docker para os serviços embed-docs e chatbot.

    export PROJECT_NUMBER=PROJECT_NUMBER
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
    --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
    --role="roles/storage.objectAdmin"
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
    --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
    --role="roles/artifactregistry.admin"
    

    Substitua PROJECT_NUMBER pelo número do projeto do Google Cloud.

  3. Crie imagens do Docker para os serviços embed-docs e chatbot. A imagem embed-docs contém o código Python para o aplicativo que recebe solicitações de encaminhamento do Eventarc e o job de embedding.

    Qdrant

    export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
    gcloud builds submit qdrant/docker/chatbot --region=${REGION} \
      --tag ${DOCKER_REPO}/chatbot:1.0 --async
    gcloud builds submit qdrant/docker/embed-docs --region=${REGION} \
      --tag ${DOCKER_REPO}/embed-docs:1.0 --async
    

    Elasticsearch

    export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
    gcloud builds submit elasticsearch/docker/chatbot --region=${REGION} \
      --tag ${DOCKER_REPO}/chatbot:1.0 --async
    gcloud builds submit elasticsearch/docker/embed-docs --region=${REGION} \
      --tag ${DOCKER_REPO}/embed-docs:1.0 --async
    

    PGVector

    export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
    gcloud builds submit postgres-pgvector/docker/chatbot --region=${REGION} \
      --tag ${DOCKER_REPO}/chatbot:1.0 --async
    gcloud builds submit postgres-pgvector/docker/embed-docs --region=${REGION} \
      --tag ${DOCKER_REPO}/embed-docs:1.0 --async
    

    Weaviate

    export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
    gcloud builds submit weaviate/docker/chatbot --region=${REGION} \
      --tag ${DOCKER_REPO}/chatbot:1.0 --async
    gcloud builds submit weaviate/docker/embed-docs --region=${REGION} \
      --tag ${DOCKER_REPO}/embed-docs:1.0 --async
    
  4. Verifique as imagens:

    gcloud artifacts docker images list $DOCKER_REPO \
        --project=$PROJECT_ID \
        --format="value(IMAGE)"
    

    O resultado será assim:

    $REGION-docker.pkg.dev/$PROJECT_ID/${KUBERNETES_CLUSTER_PREFIX}-images/chatbot
    $REGION-docker.pkg.dev/$PROJECT_ID/${KUBERNETES_CLUSTER_PREFIX}-images/embed-docs
    
  5. Implante uma conta de serviço do Kubernetes com permissões para executar jobs do Kubernetes:

    Qdrant

    sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" qdrant/manifests/05-rag/service-account.yaml | kubectl -n qdrant apply -f -
    

    Elasticsearch

    sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" elasticsearch/manifests/05-rag/service-account.yaml | kubectl -n elastic apply -f -
    

    PGVector

    sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" postgres-pgvector/manifests/03-rag/service-account.yaml | kubectl -n pg-ns apply -f -
    

    Weaviate

    sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" weaviate/manifests/04-rag/service-account.yaml | kubectl -n weaviate apply -f -
    
  6. Ao usar o Terraform para criar o cluster do GKE e definir create_service_account como verdadeiro, uma conta de serviço separada será criada e usada pelo cluster e pelos nós. Conceda o papel artifactregistry.serviceAgent a esta conta de serviço do Compute Engine para permitir que os nós extraiam a imagem do Artifact Registry criado para embed-docs e chatbot.

    export CLUSTER_SERVICE_ACCOUNT=$(gcloud container clusters describe ${KUBERNETES_CLUSTER_PREFIX}-cluster \
    --region=${REGION} \
    --format="value(nodeConfig.serviceAccount)")
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
    --member="serviceAccount:${CLUSTER_SERVICE_ACCOUNT}" \
    --role="roles/artifactregistry.serviceAgent"
    

    Sem o acesso à conta de serviço, os nós podem ter problemas de permissão ao tentar extrair a imagem do Artifact Registry ao implantar os serviços embed-docs e chatbot.

  7. Implante uma implantação do Kubernetes para os serviços embed-docs e chatbot:

    Qdrant

    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" qdrant/manifests/05-rag/chatbot.yaml | kubectl -n qdrant apply -f -
    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" qdrant/manifests/05-rag/docs-embedder.yaml | kubectl -n qdrant apply -f -
    

    Elasticsearch

    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" elasticsearch/manifests/05-rag/chatbot.yaml | kubectl -n elastic apply -f -
    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" elasticsearch/manifests/05-rag/docs-embedder.yaml | kubectl -n elastic apply -f -
    

    PGVector

    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" postgres-pgvector/manifests/03-rag/chatbot.yaml | kubectl -n pg-ns apply -f -
    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" postgres-pgvector/manifests/03-rag/docs-embedder.yaml | kubectl -n pg-ns apply -f -
    

    Weaviate

    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" weaviate/manifests/04-rag/chatbot.yaml | kubectl -n weaviate apply -f -
    sed "s|<DOCKER_REPO>|$DOCKER_REPO|" weaviate/manifests/04-rag/docs-embedder.yaml | kubectl -n weaviate apply -f -
    
  8. Ative os gatilhos do Eventarc para o GKE:

    gcloud eventarc gke-destinations init
    

    Quando solicitado, digite y.

  9. Implante o bucket do Cloud Storage e crie um gatilho do Eventarc usando o Terraform:

    export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
    terraform -chdir=vector-database/terraform/cloud-storage init
    terraform -chdir=vector-database/terraform/cloud-storage apply \
      -var project_id=${PROJECT_ID} \
      -var region=${REGION} \
      -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX} \
      -var db_namespace=${DB_NAMESPACE}
    

    Quando solicitado, digite yes. O comando pode demorar uns minutos para ser executado.

    O Terraform cria os seguintes recursos:

    • Um bucket do Cloud Storage para fazer upload dos documentos
    • Um gatilho do Eventarc
    • Uma conta de serviço do Google Cloud chamada service_account_eventarc_name com permissão para usar o Eventarc.
    • Uma conta de serviço do Google Cloud chamada service_account_bucket_name com permissão para ler o bucket e acessar modelos da Vertex AI.

    O resultado será assim:

    ... # Several lines of output omitted
    
    Apply complete! Resources: 15 added, 0 changed, 0 destroyed.
    
    ... # Several lines of output omitted
    

Carregar documentos e executar consultas de chatbot

Faça upload dos documentos de demonstração e execute consultas para pesquisar nos documentos de demonstração usando o chatbot:

  1. Faça upload do documento para o bucket carbon-free-energy.pdf:

    gsutil cp vector-database/documents/carbon-free-energy.pdf gs://${PROJECT_ID}-${KUBERNETES_CLUSTER_PREFIX}-training-docs
    
  2. Verifique se o job de incorporação de documentos foi concluído:

    kubectl get job -n ${DB_NAMESPACE}
    

    O resultado será assim:

    NAME                            COMPLETIONS   DURATION   AGE
    docs-embedder1716570453361446   1/1           32s        71s
    
  3. Tenha o endereço IP externo do balanceador de carga:

    export EXTERNAL_IP=$(kubectl -n ${DB_NAMESPACE} get svc chatbot --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
    echo http://${EXTERNAL_IP}:80
    
  4. Abra o endereço IP externo no navegador da Web:

    http://EXTERNAL_IP
    

    O chatbot responde com uma mensagem semelhante a esta:

    How can I help you?
    
  5. Faça perguntas sobre o conteúdo dos documentos enviados. Se o chatbot não encontrar nada, ele vai responder I don't know. Por exemplo, você pode perguntar o seguinte:

    You: Hi, what are Google plans for the future?
    

    Um exemplo de saída do chatbot é semelhante ao seguinte:

    Bot: Google intends to run on carbon-free energy everywhere, at all times by 2030. To achieve this, it will rely on a combination of renewable energy sources, such as wind and solar, and carbon-free technologies, such as battery storage.
    
  6. Faça uma pergunta ao chatbot que não esteja relacionada ao documento enviado. Por exemplo, pergunte:

    You: What are Google plans to colonize Mars?
    

    Um exemplo de saída do chatbot é semelhante ao seguinte:

    Bot: I don't know. The provided context does not mention anything about Google's plans to colonize Mars.
    

Sobre o código do aplicativo

Esta seção explica como o código do aplicativo funciona. Há três scripts nas imagens Docker:

  • endpoint.py: recebe eventos do Eventarc em cada upload de documento e inicia os jobs do Kubernetes para processá-los.
  • embedding-job.py: faz o download de documentos do bucket, cria embeddings e insere embeddings no banco de dados vetorial.
  • chat.py: executa consultas sobre o conteúdo dos documentos armazenados.

O diagrama mostra o processo de geração de respostas usando os dados dos documentos:

No diagrama, o aplicativo carrega um arquivo PDF, o divide em blocos, vetores e, em seguida, envia os vetores para um banco de dados de vetores. Mais tarde, um usuário faz uma pergunta ao chatbot. A cadeia RAG usa a pesquisa semântica para pesquisar o banco de dados de vetores e, em seguida, retorna o contexto com a pergunta para o LLM. O LLM responde à pergunta e a armazena no histórico de chat.

Sobre endpoint.py

Esse arquivo processa mensagens do Eventarc, cria um job do Kubernetes para incorporar o documento e aceita solicitações de qualquer lugar na porta 5001.

Qdrant

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from flask import Flask, jsonify
from flask import request
import logging
import sys,os, time
from kubernetes import client, config, utils
import kubernetes.client
from kubernetes.client.rest import ApiException


app = Flask(__name__)
@app.route('/check')
def message():
    return jsonify({"Message": "Hi there"})


@app.route('/', methods=['POST'])
def bucket():
    request_data = request.get_json()
    print(request_data)
    bckt = request_data['bucket']
    f_name = request_data['name']
    id = request_data['generation'] 
    kube_create_job(bckt, f_name, id)
    return "ok"

# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# Setup K8 configs
config.load_incluster_config()
def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="qdrant", container_name="jobcontainer", env_vars={}):

    body = client.V1Job(api_version="batch/v1", kind="Job")
    body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
    body.status = client.V1JobStatus()

    template = client.V1PodTemplate()
    template.template = client.V1PodTemplateSpec()
    env_list = [
        client.V1EnvVar(name="QDRANT_URL", value=os.getenv("QDRANT_URL")),
        client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
        client.V1EnvVar(name="FILE_NAME", value=f_name), 
        client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
        client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="api-key", name="qdrant-database-apikey"))), 
    ]

    container = client.V1Container(name=container_name, image=container_image, env=env_list)
    template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')

    body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
    return body
def kube_test_credentials():
    try: 
        api_response = api_instance.get_api_resources()
        logging.info(api_response)
    except ApiException as e:
        print("Exception when calling API: %s\n" % e)

def kube_create_job(bckt, f_name, id):
    container_image = os.getenv("JOB_IMAGE")
    namespace = os.getenv("JOB_NAMESPACE")
    name = "docs-embedder" + id
    body = kube_create_job_object(name, container_image, bckt, f_name)
    v1=client.BatchV1Api()
    try: 
        v1.create_namespaced_job(namespace, body, pretty=True)
    except ApiException as e:
        print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
    return

if __name__ == '__main__':
    app.run('0.0.0.0', port=5001, debug=True)

Elasticsearch

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from flask import Flask, jsonify
from flask import request
import logging
import sys,os, time
from kubernetes import client, config, utils
import kubernetes.client
from kubernetes.client.rest import ApiException


app = Flask(__name__)
@app.route('/check')
def message():
    return jsonify({"Message": "Hi there"})


@app.route('/', methods=['POST'])
def bucket():
    request_data = request.get_json()
    print(request_data)
    bckt = request_data['bucket']
    f_name = request_data['name']
    id = request_data['generation'] 
    kube_create_job(bckt, f_name, id)
    return "ok"

# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# Setup K8 configs
config.load_incluster_config()

def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="elastic", container_name="jobcontainer", env_vars={}):

    body = client.V1Job(api_version="batch/v1", kind="Job")
    body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
    body.status = client.V1JobStatus()

    template = client.V1PodTemplate()
    template.template = client.V1PodTemplateSpec()
    env_list = [
        client.V1EnvVar(name="ES_URL", value=os.getenv("ES_URL")),
        client.V1EnvVar(name="INDEX_NAME", value="training-docs"), 
        client.V1EnvVar(name="FILE_NAME", value=f_name), 
        client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
        client.V1EnvVar(name="PASSWORD", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="elastic", name="elasticsearch-ha-es-elastic-user"))), 
    ]

    container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
    template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')

    body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
    return body

def kube_test_credentials():
    try: 
        api_response = api_instance.get_api_resources()
        logging.info(api_response)
    except ApiException as e:
        print("Exception when calling API: %s\n" % e)

def kube_create_job(bckt, f_name, id):
    container_image = os.getenv("JOB_IMAGE")
    namespace = os.getenv("JOB_NAMESPACE")
    name = "docs-embedder" + id
    body = kube_create_job_object(name, container_image, bckt, f_name)
    v1=client.BatchV1Api()
    try: 
        v1.create_namespaced_job(namespace, body, pretty=True)
    except ApiException as e:
        print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
    return

if __name__ == '__main__':
    app.run('0.0.0.0', port=5001, debug=True)

PGVector

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from flask import Flask, jsonify
from flask import request
import logging
import sys,os, time
from kubernetes import client, config, utils
import kubernetes.client
from kubernetes.client.rest import ApiException


app = Flask(__name__)
@app.route('/check')
def message():
    return jsonify({"Message": "Hi there"})


@app.route('/', methods=['POST'])
def bucket():
    request_data = request.get_json()
    print(request_data)
    bckt = request_data['bucket']
    f_name = request_data['name']
    id = request_data['generation'] 
    kube_create_job(bckt, f_name, id)
    return "ok"

# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# Setup K8 configs
config.load_incluster_config()
def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="pg-ns", container_name="jobcontainer", env_vars={}):

    body = client.V1Job(api_version="batch/v1", kind="Job")
    body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
    body.status = client.V1JobStatus()

    template = client.V1PodTemplate()
    template.template = client.V1PodTemplateSpec()
    env_list = [
        client.V1EnvVar(name="POSTGRES_HOST", value=os.getenv("POSTGRES_HOST")),
        client.V1EnvVar(name="DATABASE_NAME", value="app"), 
        client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
        client.V1EnvVar(name="FILE_NAME", value=f_name), 
        client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
        client.V1EnvVar(name="PASSWORD", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="password", name="gke-pg-cluster-app"))), 
        client.V1EnvVar(name="USERNAME", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="username", name="gke-pg-cluster-app"))), 
    ]

    container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
    template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')

    body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
    return body
def kube_test_credentials():
    try: 
        api_response = api_instance.get_api_resources()
        logging.info(api_response)
    except ApiException as e:
        print("Exception when calling API: %s\n" % e)

def kube_create_job(bckt, f_name, id):
    container_image = os.getenv("JOB_IMAGE")
    namespace = os.getenv("JOB_NAMESPACE")
    name = "docs-embedder" + id
    body = kube_create_job_object(name, container_image, bckt, f_name)
    v1=client.BatchV1Api()
    try: 
        v1.create_namespaced_job(namespace, body, pretty=True)
    except ApiException as e:
        print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
    return

if __name__ == '__main__':
    app.run('0.0.0.0', port=5001, debug=True)

Weaviate

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from flask import Flask, jsonify
from flask import request
import logging
import sys,os, time
from kubernetes import client, config, utils
import kubernetes.client
from kubernetes.client.rest import ApiException


app = Flask(__name__)
@app.route('/check')
def message():
    return jsonify({"Message": "Hi there"})


@app.route('/', methods=['POST'])
def bucket():
    request_data = request.get_json()
    print(request_data)
    bckt = request_data['bucket']
    f_name = request_data['name']
    id = request_data['generation'] 
    kube_create_job(bckt, f_name, id)
    return "ok"

# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# Setup K8 configs
config.load_incluster_config()
def kube_create_job_object(name, container_image, bucket_name, f_name, namespace, container_name="jobcontainer", env_vars={}):

    body = client.V1Job(api_version="batch/v1", kind="Job")
    body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
    body.status = client.V1JobStatus()

    template = client.V1PodTemplate()
    template.template = client.V1PodTemplateSpec()
    env_list = [
        client.V1EnvVar(name="WEAVIATE_ENDPOINT", value=os.getenv("WEAVIATE_ENDPOINT")),
        client.V1EnvVar(name="WEAVIATE_GRPC_ENDPOINT", value=os.getenv("WEAVIATE_GRPC_ENDPOINT")),
        client.V1EnvVar(name="FILE_NAME", value=f_name), 
        client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
        client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="AUTHENTICATION_APIKEY_ALLOWED_KEYS", name="apikeys"))), 
    ]

    container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
    template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')

    body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
    return body
def kube_test_credentials():
    try: 
        api_response = api_instance.get_api_resources()
        logging.info(api_response)
    except ApiException as e:
        print("Exception when calling API: %s\n" % e)

def kube_create_job(bckt, f_name, id):
    container_image = os.getenv("JOB_IMAGE")
    namespace = os.getenv("JOB_NAMESPACE")
    name = "docs-embedder" + id
    body = kube_create_job_object(name, container_image, bckt, f_name, namespace)
    v1=client.BatchV1Api()
    try: 
        v1.create_namespaced_job(namespace, body, pretty=True)
    except ApiException as e:
        print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
    return

if __name__ == '__main__':
    app.run('0.0.0.0', port=5001, debug=True)

Sobre embedding-job.py

Esse arquivo processa documentos e os envia para o banco de dados vetorial.

Qdrant

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import ChatVertexAI
from langchain.prompts import ChatPromptTemplate
from langchain_google_vertexai import VertexAIEmbeddings
from langchain.memory import ConversationBufferWindowMemory
from langchain_community.vectorstores import Qdrant
from qdrant_client import QdrantClient
import streamlit as st
import os

vertexAI = ChatVertexAI(model_name="gemini-pro", streaming=True, convert_system_message_to_human=True)
prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
        ("human", """
        The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
        Base your response on the provided text context and the current conversation history to answer the query.
        Select the most relevant information from the context.
        Generate a draft response using the selected information. Remove duplicate content from the draft response.
        Generate your final response after adjusting it to increase accuracy and relevance.
        Now only show your final response!
        If you do not know the answer or context is not relevant, response with "I don't know".

        text_context:
        {context}

        conversation_history:
        {history}

        query:
        {query}
        """),
    ]
)

embedding_model = VertexAIEmbeddings("textembedding-gecko@001")

client = QdrantClient(
    url=os.getenv("QDRANT_URL"),
    api_key=os.getenv("APIKEY"),
)
collection_name = os.getenv("COLLECTION_NAME")
vector_search = Qdrant(client, collection_name, embeddings=embedding_model)
def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

st.title("🤖 Chatbot")
if "messages" not in st.session_state:
    st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
if "memory" not in st.session_state:
    st.session_state["memory"] = ConversationBufferWindowMemory(
        memory_key="history",
        ai_prefix="Bob",
        human_prefix="User",
        k=3,
    )
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.write(message["content"])
if chat_input := st.chat_input():
    with st.chat_message("human"):
        st.write(chat_input)
        st.session_state.messages.append({"role": "human", "content": chat_input})

    found_docs = vector_search.similarity_search(chat_input)
    context = format_docs(found_docs)

    prompt_value = prompt_template.format_messages(name="Bob", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
    with st.chat_message("ai"):
        with st.spinner("Typing..."):
            content = ""
            with st.empty():
                for chunk in vertexAI.stream(prompt_value):
                    content += chunk.content
                    st.write(content)
            st.session_state.messages.append({"role": "ai", "content": content})

    st.session_state.memory.save_context({"input": chat_input}, {"output": content})

Elasticsearch

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import VertexAIEmbeddings
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from elasticsearch import Elasticsearch
from langchain_community.vectorstores.elasticsearch import ElasticsearchStore
from google.cloud import storage
import os

bucketname = os.getenv("BUCKET_NAME")
filename = os.getenv("FILE_NAME")

storage_client = storage.Client()
bucket = storage_client.bucket(bucketname)
blob = bucket.blob(filename)
blob.download_to_filename("/documents/" + filename)

loader = PyPDFLoader("/documents/" + filename)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
documents = loader.load_and_split(text_splitter)

embeddings = VertexAIEmbeddings("textembedding-gecko@001")

client = Elasticsearch(
    [os.getenv("ES_URL")], 
    verify_certs=False, 
    ssl_show_warn=False,
    basic_auth=("elastic", os.getenv("PASSWORD"))
)

db = ElasticsearchStore.from_documents(
    documents,
    embeddings,
    es_connection=client,
    index_name=os.getenv("INDEX_NAME")
)
db.client.indices.refresh(index=os.getenv("INDEX_NAME"))

print(filename + " was successfully embedded") 
print(f"# of vectors = {len(documents)}")

PGVector

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import VertexAIEmbeddings
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores.pgvector import PGVector
from google.cloud import storage
import os
bucketname = os.getenv("BUCKET_NAME")
filename = os.getenv("FILE_NAME")

storage_client = storage.Client()
bucket = storage_client.bucket(bucketname)
blob = bucket.blob(filename)
blob.download_to_filename("/documents/" + filename)

loader = PyPDFLoader("/documents/" + filename)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
documents = loader.load_and_split(text_splitter)
for document in documents:
    document.page_content = document.page_content.replace('\x00', '')

embeddings = VertexAIEmbeddings("textembedding-gecko@001")

CONNECTION_STRING = PGVector.connection_string_from_db_params(
    driver="psycopg2",
    host=os.environ.get("POSTGRES_HOST"),
    port=5432,
    database=os.environ.get("DATABASE_NAME"),
    user=os.environ.get("USERNAME"),
    password=os.environ.get("PASSWORD"),
)
COLLECTION_NAME = os.environ.get("COLLECTION_NAME")

db = PGVector.from_documents(
    embedding=embeddings,
    documents=documents,
    collection_name=COLLECTION_NAME,
    connection_string=CONNECTION_STRING,
    use_jsonb=True
)

print(filename + " was successfully embedded") 
print(f"# of vectors = {len(documents)}")

Weaviate

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import VertexAIEmbeddings
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
import weaviate
from weaviate.connect import ConnectionParams
from langchain_weaviate.vectorstores import WeaviateVectorStore
from google.cloud import storage
import os
bucketname = os.getenv("BUCKET_NAME")
filename = os.getenv("FILE_NAME")

storage_client = storage.Client()
bucket = storage_client.bucket(bucketname)
blob = bucket.blob(filename)
blob.download_to_filename("/documents/" + filename)

loader = PyPDFLoader("/documents/" + filename)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
documents = loader.load_and_split(text_splitter)

embeddings = VertexAIEmbeddings("textembedding-gecko@001")

auth_config = weaviate.auth.AuthApiKey(api_key=os.getenv("APIKEY"))
client = weaviate.WeaviateClient(
    connection_params=ConnectionParams.from_params(
        http_host=os.getenv("WEAVIATE_ENDPOINT"),
        http_port="80",
        http_secure=False,
        grpc_host=os.getenv("WEAVIATE_GRPC_ENDPOINT"),
        grpc_port="50051",
        grpc_secure=False,
    ),
    auth_client_secret=auth_config
)
client.connect()
if not client.collections.exists("trainingdocs"):
    collection = client.collections.create(name="trainingdocs")
db = WeaviateVectorStore.from_documents(documents, embeddings, client=client, index_name="trainingdocs")

print(filename + " was successfully embedded") 
print(f"# of vectors = {len(documents)}")

Sobre chat.py

Esse arquivo configura o modelo para responder a perguntas usando apenas o contexto fornecido e as respostas anteriores. Se o contexto ou o histórico de conversa não corresponder a nenhum dado, o modelo vai retornar I don't know.

Qdrant

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from flask import Flask, jsonify
from flask import request
import logging
import sys,os, time
from kubernetes import client, config, utils
import kubernetes.client
from kubernetes.client.rest import ApiException


app = Flask(__name__)
@app.route('/check')
def message():
    return jsonify({"Message": "Hi there"})


@app.route('/', methods=['POST'])
def bucket():
    request_data = request.get_json()
    print(request_data)
    bckt = request_data['bucket']
    f_name = request_data['name']
    id = request_data['generation'] 
    kube_create_job(bckt, f_name, id)
    return "ok"

# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# Setup K8 configs
config.load_incluster_config()
def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="qdrant", container_name="jobcontainer", env_vars={}):

    body = client.V1Job(api_version="batch/v1", kind="Job")
    body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
    body.status = client.V1JobStatus()

    template = client.V1PodTemplate()
    template.template = client.V1PodTemplateSpec()
    env_list = [
        client.V1EnvVar(name="QDRANT_URL", value=os.getenv("QDRANT_URL")),
        client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
        client.V1EnvVar(name="FILE_NAME", value=f_name), 
        client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
        client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="api-key", name="qdrant-database-apikey"))), 
    ]

    container = client.V1Container(name=container_name, image=container_image, env=env_list)
    template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')

    body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
    return body
def kube_test_credentials():
    try: 
        api_response = api_instance.get_api_resources()
        logging.info(api_response)
    except ApiException as e:
        print("Exception when calling API: %s\n" % e)

def kube_create_job(bckt, f_name, id):
    container_image = os.getenv("JOB_IMAGE")
    namespace = os.getenv("JOB_NAMESPACE")
    name = "docs-embedder" + id
    body = kube_create_job_object(name, container_image, bckt, f_name)
    v1=client.BatchV1Api()
    try: 
        v1.create_namespaced_job(namespace, body, pretty=True)
    except ApiException as e:
        print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
    return

if __name__ == '__main__':
    app.run('0.0.0.0', port=5001, debug=True)

Elasticsearch

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import ChatVertexAI
from langchain.prompts import ChatPromptTemplate
from langchain_google_vertexai import VertexAIEmbeddings
from langchain.memory import ConversationBufferWindowMemory
from elasticsearch import Elasticsearch
from langchain_community.vectorstores.elasticsearch import ElasticsearchStore
import streamlit as st
import os

vertexAI = ChatVertexAI(model_name="gemini-pro", streaming=True, convert_system_message_to_human=True)
prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
        ("human", """
        The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
        Base your response on the provided text context and the current conversation history to answer the query.
        Select the most relevant information from the context.
        Generate a draft response using the selected information. Remove duplicate content from the draft response.
        Generate your final response after adjusting it to increase accuracy and relevance.
        Now only show your final response!
        If you do not know the answer or context is not relevant, response with "I don't know".

        text_context:
        {context}

        conversation_history:
        {history}

        query:
        {query}
        """),
    ]
)

embedding_model = VertexAIEmbeddings("textembedding-gecko@001")

client = Elasticsearch(
    [os.getenv("ES_URL")], 
    verify_certs=False, 
    ssl_show_warn=False,
    basic_auth=("elastic", os.getenv("PASSWORD"))
)
vector_search = ElasticsearchStore(
    index_name=os.getenv("INDEX_NAME"),
    es_connection=client,
    embedding=embedding_model
)

def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

st.title("🤖 Chatbot")
if "messages" not in st.session_state:
    st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]

if "memory" not in st.session_state:
    st.session_state["memory"] = ConversationBufferWindowMemory(
        memory_key="history",
        ai_prefix="Bot",
        human_prefix="User",
        k=3,
    )

for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.write(message["content"])

if chat_input := st.chat_input():
    with st.chat_message("human"):
        st.write(chat_input)
        st.session_state.messages.append({"role": "human", "content": chat_input})

    found_docs = vector_search.similarity_search(chat_input)
    context = format_docs(found_docs)

    prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
    with st.chat_message("ai"):
        with st.spinner("Typing..."):
            content = ""
            with st.empty():
                for chunk in vertexAI.stream(prompt_value):
                    content += chunk.content
                    st.write(content)
            st.session_state.messages.append({"role": "ai", "content": content})

    st.session_state.memory.save_context({"input": chat_input}, {"output": content})

PGVector

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import ChatVertexAI
from langchain.prompts import ChatPromptTemplate
from langchain_google_vertexai import VertexAIEmbeddings
from langchain.memory import ConversationBufferWindowMemory
from langchain_community.vectorstores.pgvector import PGVector
import streamlit as st
import os

vertexAI = ChatVertexAI(model_name="gemini-pro", streaming=True, convert_system_message_to_human=True)
prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
        ("human", """
        The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
        Base your response on the provided text context and the current conversation history to answer the query.
        Select the most relevant information from the context.
        Generate a draft response using the selected information. Remove duplicate content from the draft response.
        Generate your final response after adjusting it to increase accuracy and relevance.
        Now only show your final response!
        If you do not know the answer or context is not relevant, response with "I don't know".

        text_context:
        {context}

        conversation_history:
        {history}

        query:
        {query}
        """),
    ]
)

embedding_model = VertexAIEmbeddings("textembedding-gecko@001")

CONNECTION_STRING = PGVector.connection_string_from_db_params(
    driver="psycopg2",
    host=os.environ.get("POSTGRES_HOST"),
    port=5432,
    database=os.environ.get("DATABASE_NAME"),
    user=os.environ.get("USERNAME"),
    password=os.environ.get("PASSWORD"),
)
COLLECTION_NAME = os.environ.get("COLLECTION_NAME"),

vector_search = PGVector(
    collection_name=COLLECTION_NAME,
    connection_string=CONNECTION_STRING,
    embedding_function=embedding_model,
)

def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

st.title("🤖 Chatbot")
if "messages" not in st.session_state:
    st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]

if "memory" not in st.session_state:
    st.session_state["memory"] = ConversationBufferWindowMemory(
        memory_key="history",
        ai_prefix="Bot",
        human_prefix="User",
        k=3,
    )

for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.write(message["content"])

if chat_input := st.chat_input():
    with st.chat_message("human"):
        st.write(chat_input)
        st.session_state.messages.append({"role": "human", "content": chat_input})

    found_docs = vector_search.similarity_search(chat_input)
    context = format_docs(found_docs)

    prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
    with st.chat_message("ai"):
        with st.spinner("Typing..."):
            content = ""
            with st.empty():
                for chunk in vertexAI.stream(prompt_value):
                    content += chunk.content
                    st.write(content)
            st.session_state.messages.append({"role": "ai", "content": content})

    st.session_state.memory.save_context({"input": chat_input}, {"output": content})

Weaviate

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_vertexai import ChatVertexAI
from langchain.prompts import ChatPromptTemplate
from langchain_google_vertexai import VertexAIEmbeddings
from langchain.memory import ConversationBufferWindowMemory
import weaviate
from weaviate.connect import ConnectionParams
from langchain_weaviate.vectorstores import WeaviateVectorStore
import streamlit as st
import os

vertexAI = ChatVertexAI(model_name="gemini-pro", streaming=True, convert_system_message_to_human=True)
prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
        ("human", """
        The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
        Base your response on the provided text context and the current conversation history to answer the query.
        Select the most relevant information from the context.
        Generate a draft response using the selected information. Remove duplicate content from the draft response.
        Generate your final response after adjusting it to increase accuracy and relevance.
        Now only show your final response!
        If you do not know the answer or context is not relevant, response with "I don't know".

        text_context:
        {context}

        conversation_history:
        {history}

        query:
        {query}
        """),
    ]
)

embedding_model = VertexAIEmbeddings("textembedding-gecko@001")

auth_config = weaviate.auth.AuthApiKey(api_key=os.getenv("APIKEY"))
client = weaviate.WeaviateClient(
    connection_params=ConnectionParams.from_params(
        http_host=os.getenv("WEAVIATE_ENDPOINT"),
        http_port="80",
        http_secure=False,
        grpc_host=os.getenv("WEAVIATE_GRPC_ENDPOINT"),
        grpc_port="50051",
        grpc_secure=False,
    ),
    auth_client_secret=auth_config
)
client.connect()

vector_search = WeaviateVectorStore.from_documents([],embedding_model,client=client, index_name="trainingdocs")

def format_docs(docs):
    return "\n\n".join([d.page_content for d in docs])

st.title("🤖 Chatbot")
if "messages" not in st.session_state:
    st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]

if "memory" not in st.session_state:
    st.session_state["memory"] = ConversationBufferWindowMemory(
        memory_key="history",
        ai_prefix="Bot",
        human_prefix="User",
        k=3,
    )

for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.write(message["content"])

if chat_input := st.chat_input():
    with st.chat_message("human"):
        st.write(chat_input)
        st.session_state.messages.append({"role": "human", "content": chat_input})

    found_docs = vector_search.similarity_search(chat_input)
    context = format_docs(found_docs)

    prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
    with st.chat_message("ai"):
        with st.spinner("Typing..."):
            content = ""
            with st.empty():
                for chunk in vertexAI.stream(prompt_value):
                    content += chunk.content
                    st.write(content)
            st.session_state.messages.append({"role": "ai", "content": content})

    st.session_state.memory.save_context({"input": chat_input}, {"output": content})

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados no tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.

Exclua o projeto

A maneira mais fácil de evitar o faturamento é excluir o projeto criado para o tutorial.

Delete a Google Cloud project:

gcloud projects delete PROJECT_ID

Se você tiver excluído o projeto, a limpeza estará completa. Se você não excluiu o projeto, exclua os recursos individuais.

Excluir recursos individuais

  1. Exclua o repositório do Artifact Registry:

    gcloud artifacts repositories delete ${KUBERNETES_CLUSTER_PREFIX}-images \
        --location=${REGION} \
        --async
    

    Quando solicitado, digite y.

  2. Exclua o bucket do Cloud Storage e o gatilho do Eventarc:

    export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
    terraform -chdir=vector-database/terraform/cloud-storage destroy \
      -var project_id=${PROJECT_ID} \
      -var region=${REGION} \
      -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX} \
      -var db_namespace=${DB_NAMESPACE}
    

    Quando solicitado, digite yes.

    O Eventarc exige que você tenha um destino de endpoint válido durante a criação e a exclusão.

A seguir