Creare un chatbot RAG con GKE e Cloud Storage


Questo tutorial mostra come integrare un'applicazione basata su un modello linguistico di grandi dimensioni (LLM) basata sulla generazione RAG (Retrieval-Augmented Generation) con file PDF che carichi in un bucket Cloud Storage.

Questa guida utilizza un database come motore di archiviazione e ricerca semantica che contiene le rappresentazioni (embedding) dei documenti caricati. Utilizzi il framework Langchain per interagire con gli incorporamenti e utilizzi i modelli Gemini disponibili tramite Vertex AI.

Langchain è un popolare framework Python open source che semplifica molte attività di machine learning e dispone di interfacce per l'integrazione con diversi servizi di AI e database vettoriali.

Questo tutorial è destinato ad amministratori e architetti di piattaforme cloud, ingegneri ML e professionisti MLOps (DevOps) interessati al deployment di applicazioni LLM RAG in GKE e Cloud Storage.

Obiettivi

In questo tutorial imparerai a:

  • Crea e implementa un'applicazione per creare e archiviare gli incorporamenti di documenti in un database vettoriale.
  • Automatizza l'applicazione per attivare nuovi caricamenti di documenti in un bucket Cloud Storage.
  • Implementa un'applicazione chatbot che utilizza la ricerca semantica per rispondere alle domande in base ai contenuti del documento.

Architettura di deployment

In questo tutorial, creerai un bucket Cloud Storage, un trigger Eventarc e i seguenti servizi:

  • embed-docs: Eventarc attiva questo servizio ogni volta che un utente carica un nuovo documento nel bucket Cloud Storage. Il servizio avvia un job Kubernetes che crea embedding per il documento caricato e li inserisce in un database vettoriale.
  • chatbot: questo servizio risponde a domande in linguaggio naturale sui documenti caricati utilizzando la ricerca semantica e l'API Gemini.

Il seguente diagramma mostra la procedura di caricamento e vettorizzazione dei documenti:

Nel diagramma, l'utente carica i file nel bucket Cloud Storage. Eventarc esegue la sottoscrizione agli eventi dell'oggetto metadataUpdated per il bucket e utilizza il forwarder di eventi di Eventarc, che è un workload Kubernetes, per chiamare il servizio embed-docs quando carichi un nuovo documento. Il servizio crea quindi gli embedding per il documento caricato. Il servizio embed-docs memorizza gli incorporamenti in un database vettoriale utilizzando il modello di incorporamento di Vertex AI.

Il seguente diagramma mostra il processo di formulazione di domande sui contenuti del documento caricato utilizzando il servizio chatbot:

Gli utenti possono porre domande utilizzando il linguaggio naturale e il chatbot genera risposte basate esclusivamente sul contenuto dei file caricati. Il chatbot recupera il contesto dal database vettoriale utilizzando la ricerca semantica, quindi invia la domanda e il contesto a Gemini.

Costi

In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il calcolatore prezzi.

I nuovi utenti di Google Cloud potrebbero avere diritto a una prova senza costi.

Al termine delle attività descritte in questo documento, puoi evitare l'addebito di ulteriori costi eliminando le risorse che hai creato. Per ulteriori informazioni, vedi Pulizia.

Prima di iniziare

In questo tutorial utilizzerai Cloud Shell per eseguire i comandi. Cloud Shell è un ambiente shell per la gestione delle risorse ospitate su Google Cloud. Cloud Shell include preinstallati gli strumenti a riga di comando Google Cloud CLI, kubectl e Terraform. Se non utilizzi Cloud Shell, installa Google Cloud CLI.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere alla gcloud CLI con la tua identità federata.

  4. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Vertex AI, Cloud Build, Eventarc, Artifact Registry APIs:

    gcloud services enable aiplatform.googleapis.com cloudbuild.googleapis.com eventarc.googleapis.com artifactregistry.googleapis.com
  8. Install the Google Cloud CLI.

  9. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere alla gcloud CLI con la tua identità federata.

  10. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Vertex AI, Cloud Build, Eventarc, Artifact Registry APIs:

    gcloud services enable aiplatform.googleapis.com cloudbuild.googleapis.com eventarc.googleapis.com artifactregistry.googleapis.com
  14. Grant roles to your user account. Run the following command once for each of the following IAM roles: eventarc.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  15. Crea un cluster

    Crea un cluster Qdrant, Elasticsearch o Postgres:

    Qdrant

    Segui le istruzioni riportate in Esegui il deployment di un database vettoriale Qdrant su GKE per creare un cluster Qdrant in esecuzione su un cluster GKE in modalità Autopilot o Standard.

    Elasticsearch

    Segui le istruzioni riportate in Esegui il deployment di un database vettoriale Elasticsearch su GKE per creare un cluster Elasticsearch in esecuzione su un cluster GKE in modalità Autopilot o Standard.

    PGVector

    Segui le istruzioni riportate in Esegui il deployment di un database vettoriale PostgreSQL su GKE per creare un cluster Postgres con PGVector in esecuzione su un cluster GKE in modalità Autopilot o standard.

    Weaviate

    Segui le istruzioni per eseguire il deployment di un database vettoriale Weaviate su GKE per creare un cluster Weaviate in esecuzione su un cluster GKE in modalità Autopilot o Standard.

    Configura l'ambiente

    Configura l'ambiente con Cloud Shell:

    1. Imposta le variabili di ambiente per il tuo progetto:

      Qdrant

      export PROJECT_ID=PROJECT_ID
      export KUBERNETES_CLUSTER_PREFIX=qdrant
      export REGION=us-central1
      export DB_NAMESPACE=qdrant
      

      Sostituisci PROJECT_ID con l'Google Cloud ID progetto.

      Elasticsearch

      export PROJECT_ID=PROJECT_ID
      export KUBERNETES_CLUSTER_PREFIX=elasticsearch
      export REGION=us-central1
      export DB_NAMESPACE=elastic
      

      Sostituisci PROJECT_ID con l'Google Cloud ID progetto.

      PGVector

      export PROJECT_ID=PROJECT_ID
      export KUBERNETES_CLUSTER_PREFIX=postgres
      export REGION=us-central1
      export DB_NAMESPACE=pg-ns
      

      Sostituisci PROJECT_ID con l'Google Cloud ID progetto.

      Weaviate

      export PROJECT_ID=PROJECT_ID
      export KUBERNETES_CLUSTER_PREFIX=weaviate
      export REGION=us-central1
      export DB_NAMESPACE=weaviate
      

      Sostituisci PROJECT_ID con l'Google Cloud ID progetto.

    2. Verifica che il cluster GKE sia in esecuzione:

      gcloud container clusters list --project=${PROJECT_ID} --region=${REGION}
      

      L'output è simile al seguente:

      NAME                                    LOCATION        MASTER_VERSION      MASTER_IP     MACHINE_TYPE  NODE_VERSION        NUM_NODES STATUS
      [KUBERNETES_CLUSTER_PREFIX]-cluster   us-central1   1.30.1-gke.1329003  <EXTERNAL IP> e2-standard-2 1.30.1-gke.1329003   6        RUNNING
      
    3. Clona il repository del codice campione da GitHub:

      git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
      
    4. Vai alla directory databases:

      cd kubernetes-engine-samples/databases
      

    Prepara l'infrastruttura

    Crea un repository Artifact Registry, crea immagini Docker ed esegui il push delle immagini Docker in Artifact Registry:

    1. Crea un repository Artifact Registry:

      gcloud artifacts repositories create ${KUBERNETES_CLUSTER_PREFIX}-images \
          --repository-format=docker \
          --location=${REGION} \
          --description="Vector database images repository" \
          --async
      
    2. Imposta le autorizzazioni storage.objectAdmin e artifactregistry.admin sul account di servizio Compute Engine per utilizzare Cloud Build per creare ed eseguire il push delle immagini Docker per i servizi embed-docs e chatbot.

      export PROJECT_NUMBER=PROJECT_NUMBER
      
      gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
      --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
      --role="roles/storage.objectAdmin"
      
      gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
      --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
      --role="roles/artifactregistry.admin"
      

      Sostituisci PROJECT_NUMBER con il Google Cloud numero del tuo progetto.

    3. Crea immagini Docker per i servizi embed-docs e chatbot. L'immagine embed-docs contiene il codice Python sia per l'applicazione che riceve le richieste di inoltro di Eventarc sia per il job di incorporamento.

      Qdrant

      export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
      gcloud builds submit qdrant/docker/chatbot --region=${REGION} \
        --tag ${DOCKER_REPO}/chatbot:1.0 --async
      gcloud builds submit qdrant/docker/embed-docs --region=${REGION} \
        --tag ${DOCKER_REPO}/embed-docs:1.0 --async
      

      Elasticsearch

      export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
      gcloud builds submit elasticsearch/docker/chatbot --region=${REGION} \
        --tag ${DOCKER_REPO}/chatbot:1.0 --async
      gcloud builds submit elasticsearch/docker/embed-docs --region=${REGION} \
        --tag ${DOCKER_REPO}/embed-docs:1.0 --async
      

      PGVector

      export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
      gcloud builds submit postgres-pgvector/docker/chatbot --region=${REGION} \
        --tag ${DOCKER_REPO}/chatbot:1.0 --async
      gcloud builds submit postgres-pgvector/docker/embed-docs --region=${REGION} \
        --tag ${DOCKER_REPO}/embed-docs:1.0 --async
      

      Weaviate

      export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
      gcloud builds submit weaviate/docker/chatbot --region=${REGION} \
        --tag ${DOCKER_REPO}/chatbot:1.0 --async
      gcloud builds submit weaviate/docker/embed-docs --region=${REGION} \
        --tag ${DOCKER_REPO}/embed-docs:1.0 --async
      
    4. Verifica le immagini:

      gcloud artifacts docker images list $DOCKER_REPO \
          --project=$PROJECT_ID \
          --format="value(IMAGE)"
      

      L'output è simile al seguente:

      $REGION-docker.pkg.dev/$PROJECT_ID/${KUBERNETES_CLUSTER_PREFIX}-images/chatbot
      $REGION-docker.pkg.dev/$PROJECT_ID/${KUBERNETES_CLUSTER_PREFIX}-images/embed-docs
      
    5. Esegui il deployment di un service account Kubernetes con le autorizzazioni per eseguire i job Kubernetes:

      Qdrant

      sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" qdrant/manifests/05-rag/service-account.yaml | kubectl -n qdrant apply -f -
      

      Elasticsearch

      sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" elasticsearch/manifests/05-rag/service-account.yaml | kubectl -n elastic apply -f -
      

      PGVector

      sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" postgres-pgvector/manifests/03-rag/service-account.yaml | kubectl -n pg-ns apply -f -
      

      Weaviate

      sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" weaviate/manifests/04-rag/service-account.yaml | kubectl -n weaviate apply -f -
      
    6. Quando utilizzi Terraform per creare il cluster GKE e create_service_account è impostato su true, viene creato e utilizzato un account di servizio separato dal cluster e dai nodi. Concedi il ruolo artifactregistry.serviceAgent a questo account di servizio Compute Engine per consentire ai nodi di estrarre l'immagine da Artifact Registry creata per embed-docs e chatbot.

      export CLUSTER_SERVICE_ACCOUNT=$(gcloud container clusters describe ${KUBERNETES_CLUSTER_PREFIX}-cluster \
      --region=${REGION} \
      --format="value(nodeConfig.serviceAccount)")
      
      gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
      --member="serviceAccount:${CLUSTER_SERVICE_ACCOUNT}" \
      --role="roles/artifactregistry.serviceAgent"
      

      Se non concedi l'accesso al account di servizio, i tuoi nodi potrebbero riscontrare problemi di autorizzazione quando tentano di estrarre l'immagine da Artifact Registry durante il deployment dei servizi embed-docs e chatbot.

    7. Esegui il deployment di un deployment Kubernetes per i servizi embed-docs e chatbot. Un deployment è un oggetto API Kubernetes che consente di eseguire più repliche di pod distribuite tra i nodi di un cluster:

      Qdrant

      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" qdrant/manifests/05-rag/chatbot.yaml | kubectl -n qdrant apply -f -
      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" qdrant/manifests/05-rag/docs-embedder.yaml | kubectl -n qdrant apply -f -
      

      Elasticsearch

      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" elasticsearch/manifests/05-rag/chatbot.yaml | kubectl -n elastic apply -f -
      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" elasticsearch/manifests/05-rag/docs-embedder.yaml | kubectl -n elastic apply -f -
      

      PGVector

      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" postgres-pgvector/manifests/03-rag/chatbot.yaml | kubectl -n pg-ns apply -f -
      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" postgres-pgvector/manifests/03-rag/docs-embedder.yaml | kubectl -n pg-ns apply -f -
      

      Weaviate

      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" weaviate/manifests/04-rag/chatbot.yaml | kubectl -n weaviate apply -f -
      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" weaviate/manifests/04-rag/docs-embedder.yaml | kubectl -n weaviate apply -f -
      
    8. Abilita i trigger Eventarc per GKE:

      gcloud eventarc gke-destinations init
      

      Quando richiesto, inserisci y.

    9. Esegui il deployment del bucket Cloud Storage e crea un trigger Eventarc utilizzando Terraform:

      export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
      terraform -chdir=vector-database/terraform/cloud-storage init
      terraform -chdir=vector-database/terraform/cloud-storage apply \
        -var project_id=${PROJECT_ID} \
        -var region=${REGION} \
        -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX} \
        -var db_namespace=${DB_NAMESPACE}
      

      Quando richiesto, digita yes. Il completamento del comando potrebbe richiedere diversi minuti.

      Terraform crea le seguenti risorse:

      • Un bucket Cloud Storage in cui caricare i documenti
      • Un trigger Eventarc
      • Un Google Cloud service account denominato service_account_eventarc_name con l'autorizzazione a utilizzare Eventarc.
      • Un Google Cloud service account denominato service_account_bucket_name con l'autorizzazione a leggere il bucket e accedere ai modelli Vertex AI.

      L'output è simile al seguente:

      ... # Several lines of output omitted
      
      Apply complete! Resources: 15 added, 0 changed, 0 destroyed.
      
      ... # Several lines of output omitted
      

    Caricare documenti ed eseguire query del chatbot

    Carica i documenti demo ed esegui query per cercare nei documenti demo utilizzando il chatbot:

    1. Carica il documento di esempio carbon-free-energy.pdf nel bucket:

      gcloud storage cp vector-database/documents/carbon-free-energy.pdf gs://${PROJECT_ID}-${KUBERNETES_CLUSTER_PREFIX}-training-docs
      
    2. Verifica che il job di incorporamento dei documenti sia stato completato correttamente:

      kubectl get job -n ${DB_NAMESPACE}
      

      L'output è simile al seguente:

      NAME                            COMPLETIONS   DURATION   AGE
      docs-embedder1716570453361446   1/1           32s        71s
      
    3. Ottieni l'indirizzo IP esterno del bilanciatore del carico:

      export EXTERNAL_IP=$(kubectl -n ${DB_NAMESPACE} get svc chatbot --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
      echo http://${EXTERNAL_IP}:80
      
    4. Apri l'indirizzo IP esterno nel browser web:

      http://EXTERNAL_IP
      

      Il chatbot risponde con un messaggio simile al seguente:

      How can I help you?
      
    5. Fare domande sui contenuti dei documenti caricati. Se il chatbot non trova nulla, risponde I don't know. Ad esempio, potresti chiedere quanto segue:

      You: Hi, what are Google plans for the future?
      

      Un esempio di output del chatbot è simile al seguente:

      Bot: Google intends to run on carbon-free energy everywhere, at all times by 2030. To achieve this, it will rely on a combination of renewable energy sources, such as wind and solar, and carbon-free technologies, such as battery storage.
      
    6. Fare al chatbot una domanda che non riguarda il contesto del documento caricato. Ad esempio, potresti chiedere quanto segue:

      You: What are Google plans to colonize Mars?
      

      Un esempio di output del chatbot è simile al seguente:

      Bot: I don't know. The provided context does not mention anything about Google's plans to colonize Mars.
      

    Informazioni sul codice dell'applicazione

    Questa sezione spiega come funziona il codice dell'applicazione. All'interno delle immagini Docker sono presenti tre script:

    • endpoint.py: riceve gli eventi Eventarc a ogni caricamento del documento e avvia i job Kubernetes per elaborarli.
    • embedding-job.py: scarica i documenti dal bucket, crea incorporamenti e li inserisce nel database vettoriale.
    • chat.py: esegue query sui contenuti dei documenti archiviati.

    Il diagramma mostra il processo di generazione delle risposte utilizzando i dati dei documenti:

    Nel diagramma, l'applicazione carica un file PDF, lo suddivide in blocchi, poi in vettori e infine invia i vettori a un database vettoriale. In un secondo momento, un utente pone una domanda al chatbot. La catena RAG utilizza la ricerca semantica per cercare nel database vettoriale, quindi restituisce il contesto insieme alla domanda all'LLM. L'LLM risponde alla domanda e la memorizza nella cronologia della chat.

    Informazioni su endpoint.py

    Questo file elabora i messaggi di Eventarc, crea un job Kubernetes per incorporare il documento e accetta richieste da qualsiasi posizione sulla porta 5001

    Qdrant

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from flask import Flask, jsonify
    from flask import request
    import logging
    import sys,os, time
    from kubernetes import client, config, utils
    import kubernetes.client
    from kubernetes.client.rest import ApiException
    
    
    app = Flask(__name__)
    @app.route('/check')
    def message():
        return jsonify({"Message": "Hi there"})
    
    
    @app.route('/', methods=['POST'])
    def bucket():
        request_data = request.get_json()
        print(request_data)
        bckt = request_data['bucket']
        f_name = request_data['name']
        id = request_data['generation'] 
        kube_create_job(bckt, f_name, id)
        return "ok"
    
    # Set logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Setup K8 configs
    config.load_incluster_config()
    def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="qdrant", container_name="jobcontainer", env_vars={}):
    
        body = client.V1Job(api_version="batch/v1", kind="Job")
        body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
        body.status = client.V1JobStatus()
    
        template = client.V1PodTemplate()
        template.template = client.V1PodTemplateSpec()
        env_list = [
            client.V1EnvVar(name="QDRANT_URL", value=os.getenv("QDRANT_URL")),
            client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
            client.V1EnvVar(name="FILE_NAME", value=f_name), 
            client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
            client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="api-key", name="qdrant-database-apikey"))), 
        ]
    
        container = client.V1Container(name=container_name, image=container_image, env=env_list)
        template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
    
        body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
        return body
    def kube_test_credentials():
        try: 
            api_response = api_instance.get_api_resources()
            logging.info(api_response)
        except ApiException as e:
            print("Exception when calling API: %s\n" % e)
    
    def kube_create_job(bckt, f_name, id):
        container_image = os.getenv("JOB_IMAGE")
        namespace = os.getenv("JOB_NAMESPACE")
        name = "docs-embedder" + id
        body = kube_create_job_object(name, container_image, bckt, f_name)
        v1=client.BatchV1Api()
        try: 
            v1.create_namespaced_job(namespace, body, pretty=True)
        except ApiException as e:
            print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
        return
    
    if __name__ == '__main__':
        app.run('0.0.0.0', port=5001, debug=True)
    

    Elasticsearch

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from flask import Flask, jsonify
    from flask import request
    import logging
    import sys,os, time
    from kubernetes import client, config, utils
    import kubernetes.client
    from kubernetes.client.rest import ApiException
    
    
    app = Flask(__name__)
    @app.route('/check')
    def message():
        return jsonify({"Message": "Hi there"})
    
    
    @app.route('/', methods=['POST'])
    def bucket():
        request_data = request.get_json()
        print(request_data)
        bckt = request_data['bucket']
        f_name = request_data['name']
        id = request_data['generation'] 
        kube_create_job(bckt, f_name, id)
        return "ok"
    
    # Set logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Setup K8 configs
    config.load_incluster_config()
    
    def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="elastic", container_name="jobcontainer", env_vars={}):
    
        body = client.V1Job(api_version="batch/v1", kind="Job")
        body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
        body.status = client.V1JobStatus()
    
        template = client.V1PodTemplate()
        template.template = client.V1PodTemplateSpec()
        env_list = [
            client.V1EnvVar(name="ES_URL", value=os.getenv("ES_URL")),
            client.V1EnvVar(name="INDEX_NAME", value="training-docs"), 
            client.V1EnvVar(name="FILE_NAME", value=f_name), 
            client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
            client.V1EnvVar(name="PASSWORD", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="elastic", name="elasticsearch-ha-es-elastic-user"))), 
        ]
    
        container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
        template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
    
        body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
        return body
    
    def kube_test_credentials():
        try: 
            api_response = api_instance.get_api_resources()
            logging.info(api_response)
        except ApiException as e:
            print("Exception when calling API: %s\n" % e)
    
    def kube_create_job(bckt, f_name, id):
        container_image = os.getenv("JOB_IMAGE")
        namespace = os.getenv("JOB_NAMESPACE")
        name = "docs-embedder" + id
        body = kube_create_job_object(name, container_image, bckt, f_name)
        v1=client.BatchV1Api()
        try: 
            v1.create_namespaced_job(namespace, body, pretty=True)
        except ApiException as e:
            print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
        return
    
    if __name__ == '__main__':
        app.run('0.0.0.0', port=5001, debug=True)
    

    PGVector

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from flask import Flask, jsonify
    from flask import request
    import logging
    import sys,os, time
    from kubernetes import client, config, utils
    import kubernetes.client
    from kubernetes.client.rest import ApiException
    
    
    app = Flask(__name__)
    @app.route('/check')
    def message():
        return jsonify({"Message": "Hi there"})
    
    
    @app.route('/', methods=['POST'])
    def bucket():
        request_data = request.get_json()
        print(request_data)
        bckt = request_data['bucket']
        f_name = request_data['name']
        id = request_data['generation'] 
        kube_create_job(bckt, f_name, id)
        return "ok"
    
    # Set logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Setup K8 configs
    config.load_incluster_config()
    def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="pg-ns", container_name="jobcontainer", env_vars={}):
    
        body = client.V1Job(api_version="batch/v1", kind="Job")
        body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
        body.status = client.V1JobStatus()
    
        template = client.V1PodTemplate()
        template.template = client.V1PodTemplateSpec()
        env_list = [
            client.V1EnvVar(name="POSTGRES_HOST", value=os.getenv("POSTGRES_HOST")),
            client.V1EnvVar(name="DATABASE_NAME", value="app"), 
            client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
            client.V1EnvVar(name="FILE_NAME", value=f_name), 
            client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
            client.V1EnvVar(name="PASSWORD", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="password", name="gke-pg-cluster-app"))), 
            client.V1EnvVar(name="USERNAME", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="username", name="gke-pg-cluster-app"))), 
        ]
    
        container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
        template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
    
        body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
        return body
    def kube_test_credentials():
        try: 
            api_response = api_instance.get_api_resources()
            logging.info(api_response)
        except ApiException as e:
            print("Exception when calling API: %s\n" % e)
    
    def kube_create_job(bckt, f_name, id):
        container_image = os.getenv("JOB_IMAGE")
        namespace = os.getenv("JOB_NAMESPACE")
        name = "docs-embedder" + id
        body = kube_create_job_object(name, container_image, bckt, f_name)
        v1=client.BatchV1Api()
        try: 
            v1.create_namespaced_job(namespace, body, pretty=True)
        except ApiException as e:
            print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
        return
    
    if __name__ == '__main__':
        app.run('0.0.0.0', port=5001, debug=True)
    

    Weaviate

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from flask import Flask, jsonify
    from flask import request
    import logging
    import sys,os, time
    from kubernetes import client, config, utils
    import kubernetes.client
    from kubernetes.client.rest import ApiException
    
    
    app = Flask(__name__)
    @app.route('/check')
    def message():
        return jsonify({"Message": "Hi there"})
    
    
    @app.route('/', methods=['POST'])
    def bucket():
        request_data = request.get_json()
        print(request_data)
        bckt = request_data['bucket']
        f_name = request_data['name']
        id = request_data['generation'] 
        kube_create_job(bckt, f_name, id)
        return "ok"
    
    # Set logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Setup K8 configs
    config.load_incluster_config()
    def kube_create_job_object(name, container_image, bucket_name, f_name, namespace, container_name="jobcontainer", env_vars={}):
    
        body = client.V1Job(api_version="batch/v1", kind="Job")
        body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
        body.status = client.V1JobStatus()
    
        template = client.V1PodTemplate()
        template.template = client.V1PodTemplateSpec()
        env_list = [
            client.V1EnvVar(name="WEAVIATE_ENDPOINT", value=os.getenv("WEAVIATE_ENDPOINT")),
            client.V1EnvVar(name="WEAVIATE_GRPC_ENDPOINT", value=os.getenv("WEAVIATE_GRPC_ENDPOINT")),
            client.V1EnvVar(name="FILE_NAME", value=f_name), 
            client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
            client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="AUTHENTICATION_APIKEY_ALLOWED_KEYS", name="apikeys"))), 
        ]
    
        container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
        template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
    
        body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
        return body
    def kube_test_credentials():
        try: 
            api_response = api_instance.get_api_resources()
            logging.info(api_response)
        except ApiException as e:
            print("Exception when calling API: %s\n" % e)
    
    def kube_create_job(bckt, f_name, id):
        container_image = os.getenv("JOB_IMAGE")
        namespace = os.getenv("JOB_NAMESPACE")
        name = "docs-embedder" + id
        body = kube_create_job_object(name, container_image, bckt, f_name, namespace)
        v1=client.BatchV1Api()
        try: 
            v1.create_namespaced_job(namespace, body, pretty=True)
        except ApiException as e:
            print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
        return
    
    if __name__ == '__main__':
        app.run('0.0.0.0', port=5001, debug=True)
    

    Informazioni su embedding-job.py

    Questo file elabora i documenti e li invia al database vettoriale.

    Qdrant

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import ChatVertexAI
    from langchain.prompts import ChatPromptTemplate
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain.memory import ConversationBufferWindowMemory
    from langchain_community.vectorstores import Qdrant
    from qdrant_client import QdrantClient
    import streamlit as st
    import os
    
    vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
    prompt_template = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
            ("human", """
            The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
            Base your response on the provided text context and the current conversation history to answer the query.
            Select the most relevant information from the context.
            Generate a draft response using the selected information. Remove duplicate content from the draft response.
            Generate your final response after adjusting it to increase accuracy and relevance.
            Now only show your final response!
            If you do not know the answer or context is not relevant, response with "I don't know".
    
            text_context:
            {context}
    
            conversation_history:
            {history}
    
            query:
            {query}
            """),
        ]
    )
    
    embedding_model = VertexAIEmbeddings("text-embedding-005")
    
    client = QdrantClient(
        url=os.getenv("QDRANT_URL"),
        api_key=os.getenv("APIKEY"),
    )
    collection_name = os.getenv("COLLECTION_NAME")
    vector_search = Qdrant(client, collection_name, embeddings=embedding_model)
    def format_docs(docs):
        return "\n\n".join([d.page_content for d in docs])
    
    st.title("🤖 Chatbot")
    if "messages" not in st.session_state:
        st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
    if "memory" not in st.session_state:
        st.session_state["memory"] = ConversationBufferWindowMemory(
            memory_key="history",
            ai_prefix="Bob",
            human_prefix="User",
            k=3,
        )
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.write(message["content"])
    if chat_input := st.chat_input():
        with st.chat_message("human"):
            st.write(chat_input)
            st.session_state.messages.append({"role": "human", "content": chat_input})
    
        found_docs = vector_search.similarity_search(chat_input)
        context = format_docs(found_docs)
    
        prompt_value = prompt_template.format_messages(name="Bob", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
        with st.chat_message("ai"):
            with st.spinner("Typing..."):
                content = ""
                with st.empty():
                    for chunk in vertexAI.stream(prompt_value):
                        content += chunk.content
                        st.write(content)
                st.session_state.messages.append({"role": "ai", "content": content})
    
        st.session_state.memory.save_context({"input": chat_input}, {"output": content})
    
    

    Elasticsearch

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.document_loaders import PyPDFLoader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from elasticsearch import Elasticsearch
    from langchain_community.vectorstores.elasticsearch import ElasticsearchStore
    from google.cloud import storage
    import os
    
    bucketname = os.getenv("BUCKET_NAME")
    filename = os.getenv("FILE_NAME")
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucketname)
    blob = bucket.blob(filename)
    blob.download_to_filename("/documents/" + filename)
    
    loader = PyPDFLoader("/documents/" + filename)
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
    documents = loader.load_and_split(text_splitter)
    
    embeddings = VertexAIEmbeddings("text-embedding-005")
    
    client = Elasticsearch(
        [os.getenv("ES_URL")], 
        verify_certs=False, 
        ssl_show_warn=False,
        basic_auth=("elastic", os.getenv("PASSWORD"))
    )
    
    db = ElasticsearchStore.from_documents(
        documents,
        embeddings,
        es_connection=client,
        index_name=os.getenv("INDEX_NAME")
    )
    db.client.indices.refresh(index=os.getenv("INDEX_NAME"))
    
    print(filename + " was successfully embedded") 
    print(f"# of vectors = {len(documents)}")
    
    

    PGVector

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.document_loaders import PyPDFLoader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain_community.vectorstores.pgvector import PGVector
    from google.cloud import storage
    import os
    bucketname = os.getenv("BUCKET_NAME")
    filename = os.getenv("FILE_NAME")
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucketname)
    blob = bucket.blob(filename)
    blob.download_to_filename("/documents/" + filename)
    
    loader = PyPDFLoader("/documents/" + filename)
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
    documents = loader.load_and_split(text_splitter)
    for document in documents:
        document.page_content = document.page_content.replace('\x00', '')
    
    embeddings = VertexAIEmbeddings("text-embedding-005")
    
    CONNECTION_STRING = PGVector.connection_string_from_db_params(
        driver="psycopg2",
        host=os.environ.get("POSTGRES_HOST"),
        port=5432,
        database=os.environ.get("DATABASE_NAME"),
        user=os.environ.get("USERNAME"),
        password=os.environ.get("PASSWORD"),
    )
    COLLECTION_NAME = os.environ.get("COLLECTION_NAME")
    
    db = PGVector.from_documents(
        embedding=embeddings,
        documents=documents,
        collection_name=COLLECTION_NAME,
        connection_string=CONNECTION_STRING,
        use_jsonb=True
    )
    
    print(filename + " was successfully embedded") 
    print(f"# of vectors = {len(documents)}")
    
    

    Weaviate

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.document_loaders import PyPDFLoader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    import weaviate
    from weaviate.connect import ConnectionParams
    from langchain_weaviate.vectorstores import WeaviateVectorStore
    from google.cloud import storage
    import os
    bucketname = os.getenv("BUCKET_NAME")
    filename = os.getenv("FILE_NAME")
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucketname)
    blob = bucket.blob(filename)
    blob.download_to_filename("/documents/" + filename)
    
    loader = PyPDFLoader("/documents/" + filename)
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
    documents = loader.load_and_split(text_splitter)
    
    embeddings = VertexAIEmbeddings("text-embedding-005")
    
    auth_config = weaviate.auth.AuthApiKey(api_key=os.getenv("APIKEY"))
    client = weaviate.WeaviateClient(
        connection_params=ConnectionParams.from_params(
            http_host=os.getenv("WEAVIATE_ENDPOINT"),
            http_port="80",
            http_secure=False,
            grpc_host=os.getenv("WEAVIATE_GRPC_ENDPOINT"),
            grpc_port="50051",
            grpc_secure=False,
        ),
        auth_client_secret=auth_config
    )
    client.connect()
    if not client.collections.exists("trainingdocs"):
        collection = client.collections.create(name="trainingdocs")
    db = WeaviateVectorStore.from_documents(documents, embeddings, client=client, index_name="trainingdocs")
    
    print(filename + " was successfully embedded") 
    print(f"# of vectors = {len(documents)}")
    
    

    Informazioni su chat.py

    Questo file configura il modello in modo che risponda alle domande utilizzando solo il contesto fornito e le risposte precedenti. Se il contesto o la cronologia della conversazione non corrispondono ad alcun dato, il modello restituisce I don't know.

    Qdrant

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from flask import Flask, jsonify
    from flask import request
    import logging
    import sys,os, time
    from kubernetes import client, config, utils
    import kubernetes.client
    from kubernetes.client.rest import ApiException
    
    
    app = Flask(__name__)
    @app.route('/check')
    def message():
        return jsonify({"Message": "Hi there"})
    
    
    @app.route('/', methods=['POST'])
    def bucket():
        request_data = request.get_json()
        print(request_data)
        bckt = request_data['bucket']
        f_name = request_data['name']
        id = request_data['generation'] 
        kube_create_job(bckt, f_name, id)
        return "ok"
    
    # Set logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Setup K8 configs
    config.load_incluster_config()
    def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="qdrant", container_name="jobcontainer", env_vars={}):
    
        body = client.V1Job(api_version="batch/v1", kind="Job")
        body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
        body.status = client.V1JobStatus()
    
        template = client.V1PodTemplate()
        template.template = client.V1PodTemplateSpec()
        env_list = [
            client.V1EnvVar(name="QDRANT_URL", value=os.getenv("QDRANT_URL")),
            client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
            client.V1EnvVar(name="FILE_NAME", value=f_name), 
            client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
            client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="api-key", name="qdrant-database-apikey"))), 
        ]
    
        container = client.V1Container(name=container_name, image=container_image, env=env_list)
        template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
    
        body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
        return body
    def kube_test_credentials():
        try: 
            api_response = api_instance.get_api_resources()
            logging.info(api_response)
        except ApiException as e:
            print("Exception when calling API: %s\n" % e)
    
    def kube_create_job(bckt, f_name, id):
        container_image = os.getenv("JOB_IMAGE")
        namespace = os.getenv("JOB_NAMESPACE")
        name = "docs-embedder" + id
        body = kube_create_job_object(name, container_image, bckt, f_name)
        v1=client.BatchV1Api()
        try: 
            v1.create_namespaced_job(namespace, body, pretty=True)
        except ApiException as e:
            print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
        return
    
    if __name__ == '__main__':
        app.run('0.0.0.0', port=5001, debug=True)
    

    Elasticsearch

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import ChatVertexAI
    from langchain.prompts import ChatPromptTemplate
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain.memory import ConversationBufferWindowMemory
    from elasticsearch import Elasticsearch
    from langchain_community.vectorstores.elasticsearch import ElasticsearchStore
    import streamlit as st
    import os
    
    vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
    prompt_template = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
            ("human", """
            The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
            Base your response on the provided text context and the current conversation history to answer the query.
            Select the most relevant information from the context.
            Generate a draft response using the selected information. Remove duplicate content from the draft response.
            Generate your final response after adjusting it to increase accuracy and relevance.
            Now only show your final response!
            If you do not know the answer or context is not relevant, response with "I don't know".
    
            text_context:
            {context}
    
            conversation_history:
            {history}
    
            query:
            {query}
            """),
        ]
    )
    
    embedding_model = VertexAIEmbeddings("text-embedding-005")
    
    client = Elasticsearch(
        [os.getenv("ES_URL")], 
        verify_certs=False, 
        ssl_show_warn=False,
        basic_auth=("elastic", os.getenv("PASSWORD"))
    )
    vector_search = ElasticsearchStore(
        index_name=os.getenv("INDEX_NAME"),
        es_connection=client,
        embedding=embedding_model
    )
    
    def format_docs(docs):
        return "\n\n".join([d.page_content for d in docs])
    
    st.title("🤖 Chatbot")
    if "messages" not in st.session_state:
        st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
    
    if "memory" not in st.session_state:
        st.session_state["memory"] = ConversationBufferWindowMemory(
            memory_key="history",
            ai_prefix="Bot",
            human_prefix="User",
            k=3,
        )
    
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.write(message["content"])
    
    if chat_input := st.chat_input():
        with st.chat_message("human"):
            st.write(chat_input)
            st.session_state.messages.append({"role": "human", "content": chat_input})
    
        found_docs = vector_search.similarity_search(chat_input)
        context = format_docs(found_docs)
    
        prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
        with st.chat_message("ai"):
            with st.spinner("Typing..."):
                content = ""
                with st.empty():
                    for chunk in vertexAI.stream(prompt_value):
                        content += chunk.content
                        st.write(content)
                st.session_state.messages.append({"role": "ai", "content": content})
    
        st.session_state.memory.save_context({"input": chat_input}, {"output": content})
    
    

    PGVector

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import ChatVertexAI
    from langchain.prompts import ChatPromptTemplate
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain.memory import ConversationBufferWindowMemory
    from langchain_community.vectorstores.pgvector import PGVector
    import streamlit as st
    import os
    
    vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
    prompt_template = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
            ("human", """
            The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
            Base your response on the provided text context and the current conversation history to answer the query.
            Select the most relevant information from the context.
            Generate a draft response using the selected information. Remove duplicate content from the draft response.
            Generate your final response after adjusting it to increase accuracy and relevance.
            Now only show your final response!
            If you do not know the answer or context is not relevant, response with "I don't know".
    
            text_context:
            {context}
    
            conversation_history:
            {history}
    
            query:
            {query}
            """),
        ]
    )
    
    embedding_model = VertexAIEmbeddings("text-embedding-005")
    
    CONNECTION_STRING = PGVector.connection_string_from_db_params(
        driver="psycopg2",
        host=os.environ.get("POSTGRES_HOST"),
        port=5432,
        database=os.environ.get("DATABASE_NAME"),
        user=os.environ.get("USERNAME"),
        password=os.environ.get("PASSWORD"),
    )
    COLLECTION_NAME = os.environ.get("COLLECTION_NAME"),
    
    vector_search = PGVector(
        collection_name=COLLECTION_NAME,
        connection_string=CONNECTION_STRING,
        embedding_function=embedding_model,
    )
    
    def format_docs(docs):
        return "\n\n".join([d.page_content for d in docs])
    
    st.title("🤖 Chatbot")
    if "messages" not in st.session_state:
        st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
    
    if "memory" not in st.session_state:
        st.session_state["memory"] = ConversationBufferWindowMemory(
            memory_key="history",
            ai_prefix="Bot",
            human_prefix="User",
            k=3,
        )
    
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.write(message["content"])
    
    if chat_input := st.chat_input():
        with st.chat_message("human"):
            st.write(chat_input)
            st.session_state.messages.append({"role": "human", "content": chat_input})
    
        found_docs = vector_search.similarity_search(chat_input)
        context = format_docs(found_docs)
    
        prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
        with st.chat_message("ai"):
            with st.spinner("Typing..."):
                content = ""
                with st.empty():
                    for chunk in vertexAI.stream(prompt_value):
                        content += chunk.content
                        st.write(content)
                st.session_state.messages.append({"role": "ai", "content": content})
    
        st.session_state.memory.save_context({"input": chat_input}, {"output": content})
    
    

    Weaviate

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import ChatVertexAI
    from langchain.prompts import ChatPromptTemplate
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain.memory import ConversationBufferWindowMemory
    import weaviate
    from weaviate.connect import ConnectionParams
    from langchain_weaviate.vectorstores import WeaviateVectorStore
    import streamlit as st
    import os
    
    vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
    prompt_template = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
            ("human", """
            The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
            Base your response on the provided text context and the current conversation history to answer the query.
            Select the most relevant information from the context.
            Generate a draft response using the selected information. Remove duplicate content from the draft response.
            Generate your final response after adjusting it to increase accuracy and relevance.
            Now only show your final response!
            If you do not know the answer or context is not relevant, response with "I don't know".
    
            text_context:
            {context}
    
            conversation_history:
            {history}
    
            query:
            {query}
            """),
        ]
    )
    
    embedding_model = VertexAIEmbeddings("text-embedding-005")
    
    auth_config = weaviate.auth.AuthApiKey(api_key=os.getenv("APIKEY"))
    client = weaviate.WeaviateClient(
        connection_params=ConnectionParams.from_params(
            http_host=os.getenv("WEAVIATE_ENDPOINT"),
            http_port="80",
            http_secure=False,
            grpc_host=os.getenv("WEAVIATE_GRPC_ENDPOINT"),
            grpc_port="50051",
            grpc_secure=False,
        ),
        auth_client_secret=auth_config
    )
    client.connect()
    
    vector_search = WeaviateVectorStore.from_documents([],embedding_model,client=client, index_name="trainingdocs")
    
    def format_docs(docs):
        return "\n\n".join([d.page_content for d in docs])
    
    st.title("🤖 Chatbot")
    if "messages" not in st.session_state:
        st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
    
    if "memory" not in st.session_state:
        st.session_state["memory"] = ConversationBufferWindowMemory(
            memory_key="history",
            ai_prefix="Bot",
            human_prefix="User",
            k=3,
        )
    
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.write(message["content"])
    
    if chat_input := st.chat_input():
        with st.chat_message("human"):
            st.write(chat_input)
            st.session_state.messages.append({"role": "human", "content": chat_input})
    
        found_docs = vector_search.similarity_search(chat_input)
        context = format_docs(found_docs)
    
        prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
        with st.chat_message("ai"):
            with st.spinner("Typing..."):
                content = ""
                with st.empty():
                    for chunk in vertexAI.stream(prompt_value):
                        content += chunk.content
                        st.write(content)
                st.session_state.messages.append({"role": "ai", "content": content})
    
        st.session_state.memory.save_context({"input": chat_input}, {"output": content})
    
    

    Esegui la pulizia

    Per evitare che al tuo Account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo tutorial, elimina il progetto che contiene le risorse oppure mantieni il progetto ed elimina le singole risorse.

    Elimina il progetto

    Il modo più semplice per evitare la fatturazione è eliminare il progetto creato per questo tutorial.

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

    Se hai eliminato il progetto, la pulizia è completa. Se non hai eliminato il progetto, procedi con l'eliminazione delle singole risorse.

    Elimina singole risorse

    1. Elimina il repository Artifact Registry:

      gcloud artifacts repositories delete ${KUBERNETES_CLUSTER_PREFIX}-images \
          --location=${REGION} \
          --async
      

      Quando richiesto, digita y.

    2. Elimina il bucket Cloud Storage e il trigger Eventarc:

      export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
      terraform -chdir=vector-database/terraform/cloud-storage destroy \
        -var project_id=${PROJECT_ID} \
        -var region=${REGION} \
        -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX} \
        -var db_namespace=${DB_NAMESPACE}
      

      Quando richiesto, digita yes.

      Eventarc richiede che tu abbia una destinazione endpoint valida sia durante la creazione sia durante l'eliminazione.

    Passaggi successivi