RAG-Chatbot mit GKE und Cloud Storage erstellen


In dieser Anleitung erfahren Sie, wie Sie eine LLM-Anwendung (Large Language Model, großes Sprachmodell), die auf Retrieval Augmented Generation (RAG) basiert, in PDF-Dateien einbinden, die Sie in einen Cloud Storage-Bucket hochladen.

In dieser Anleitung wird eine Datenbank als Speicher- und semantische Suchmaschine verwendet, in der die Darstellungen (Einbettungen) der hochgeladenen Dokumente gespeichert werden. Sie verwenden das Langchain-Framework, um mit den Einbettungen zu interagieren, und Gemini-Modelle, die über Vertex AI verfügbar sind.

LangChain ist ein beliebtes Open-Source-Python-Framework, das viele Machine-Learning-Aufgaben vereinfacht und Schnittstellen für die Integration mit verschiedenen Vektordatenbanken und KI-Diensten bietet.

Diese Anleitung richtet sich an Cloud Platform-Administratoren und -Architekten, ML-Entwickler und MLOps-Experten (DevOps), die an der Bereitstellung von RAG-LLM-Anwendungen in GKE und Cloud Storage interessiert sind.

Ziele

In dieser Anleitung erfahren Sie mehr über die folgenden Themen:

  • Erstellen und stellen Sie eine Anwendung bereit, um Dokumenteinbettungen in einer Vektordatenbank zu erstellen und zu speichern.
  • Automatisieren Sie die Anwendung, um neue Dokumente in einen Cloud Storage-Bucket hochzuladen.
  • Stellen Sie eine Chatbot-Anwendung bereit, die die semantische Suche verwendet, um Fragen basierend auf dem Dokumentinhalt zu beantworten.

Bereitstellungsarchitektur

In dieser Anleitung erstellen Sie einen Cloud Storage-Bucket, einen Eventarc-Trigger und die folgenden Dienste:

  • embed-docs: Eventarc löst diesen Dienst jedes Mal aus, wenn ein Nutzer ein neues Dokument in den Cloud Storage-Bucket hochlädt. Der Dienst startet einen Kubernetes-Job, der Einbettungen für das hochgeladene Dokument erstellt und die Einbettungen in eine Vektordatenbank einfügt.
  • chatbot: Dieser Dienst beantwortet Fragen in natürlicher Sprache zu den hochgeladenen Dokumenten mithilfe der semantischen Suche und der Gemini API.

Das folgende Diagramm zeigt den Prozess des Hochladens und Vektorisierens von Dokumenten:

Im Diagramm lädt der Nutzer Dateien in den Cloud Storage-Bucket hoch. Eventarc abonniert metadataUpdated-Ereignisse für das Objekt für den Bucket und verwendet den Event-Forwarder von Eventarc, eine Kubernetes-Arbeitslast, um den embed-docs-Dienst aufzurufen, wenn Sie ein neues Dokument hochladen. Der Dienst erstellt dann Einbettungen für das hochgeladene Dokument. Der embed-docs-Dienst speichert die Einbettungen in einer Vektordatenbank mit dem Vertex AI-Einbettungsmodell.

Das folgende Diagramm zeigt den Prozess des Stellens von Fragen zum Inhalt des hochgeladenen Dokuments mit dem chatbot-Dienst:

Nutzer können Fragen in natürlicher Sprache stellen und der Chatbot generiert Antworten, die ausschließlich auf den Inhalten der hochgeladenen Dateien basieren. Der Chatbot ruft mithilfe der semantischen Suche Kontext aus der Vektordatenbank ab und sendet dann die Frage und den Kontext an Gemini.

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.

Neuen Google Cloud Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweise

In dieser Anleitung verwenden Sie Cloud Shell zum Ausführen von Befehlen. Cloud Shell ist eine Shell-Umgebung für die Verwaltung von Ressourcen, die in Google Cloudgehostet werden. Cloud Shell wird mit den Befehlszeilentools Google Cloud CLI, kubectl und Terraform vorinstalliert. Wenn Sie Cloud Shell nicht verwenden, installieren Sie die Google Cloud CLI.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

  4. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Vertex AI, Cloud Build, Eventarc, Artifact Registry APIs:

    gcloud services enable aiplatform.googleapis.com cloudbuild.googleapis.com eventarc.googleapis.com artifactregistry.googleapis.com
  8. Install the Google Cloud CLI.

  9. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

  10. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Vertex AI, Cloud Build, Eventarc, Artifact Registry APIs:

    gcloud services enable aiplatform.googleapis.com cloudbuild.googleapis.com eventarc.googleapis.com artifactregistry.googleapis.com
  14. Grant roles to your user account. Run the following command once for each of the following IAM roles: eventarc.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  15. Cluster erstellen

    Qdrant-, Elasticsearch- oder Postgres-Cluster erstellen:

    Qdrant

    Folgen Sie der Anleitung unter Qdrant-Vektordatenbank in GKE bereitstellen, um einen Qdrant-Cluster zu erstellen, der in einem GKE-Cluster im Autopilot- oder Standardmodus ausgeführt wird.

    Elasticsearch

    Folgen Sie der Anleitung unter Elasticsearch-Vektordatenbank in GKE bereitstellen, um einen Elasticsearch-Cluster zu erstellen, der in einem GKE-Cluster im Autopilot- oder Standardmodus ausgeführt wird.

    PGVector

    Folgen Sie der Anleitung unter PostgreSQL-Vektordatenbank in GKE bereitstellen, um einen Postgres-Cluster mit PGVector zu erstellen, der in einem GKE-Cluster im Autopilot- oder Standardmodus ausgeführt wird.

    Weaviate

    Folgen Sie der Anleitung zum Bereitstellen einer Weaviate-Vektordatenbank in GKE, um einen Weaviate-Cluster zu erstellen, der in einem GKE-Cluster im Autopilot- oder Standardmodus ausgeführt wird.

    Umgebung einrichten

    So richten Sie Ihre Umgebung mit Cloud Shell ein:

    1. Legen Sie Umgebungsvariablen für Ihr Projekt fest:

      Qdrant

      export PROJECT_ID=PROJECT_ID
      export KUBERNETES_CLUSTER_PREFIX=qdrant
      export REGION=us-central1
      export DB_NAMESPACE=qdrant
      

      Ersetzen Sie PROJECT_ID durch IhreGoogle Cloud Projekt-ID.

      Elasticsearch

      export PROJECT_ID=PROJECT_ID
      export KUBERNETES_CLUSTER_PREFIX=elasticsearch
      export REGION=us-central1
      export DB_NAMESPACE=elastic
      

      Ersetzen Sie PROJECT_ID durch IhreGoogle Cloud Projekt-ID.

      PGVector

      export PROJECT_ID=PROJECT_ID
      export KUBERNETES_CLUSTER_PREFIX=postgres
      export REGION=us-central1
      export DB_NAMESPACE=pg-ns
      

      Ersetzen Sie PROJECT_ID durch IhreGoogle Cloud Projekt-ID.

      Weaviate

      export PROJECT_ID=PROJECT_ID
      export KUBERNETES_CLUSTER_PREFIX=weaviate
      export REGION=us-central1
      export DB_NAMESPACE=weaviate
      

      Ersetzen Sie PROJECT_ID durch IhreGoogle Cloud Projekt-ID.

    2. Prüfen Sie, ob Ihr GKE-Cluster ausgeführt wird

      gcloud container clusters list --project=${PROJECT_ID} --region=${REGION}
      

      Die Ausgabe sieht in etwa so aus:

      NAME                                    LOCATION        MASTER_VERSION      MASTER_IP     MACHINE_TYPE  NODE_VERSION        NUM_NODES STATUS
      [KUBERNETES_CLUSTER_PREFIX]-cluster   us-central1   1.30.1-gke.1329003  <EXTERNAL IP> e2-standard-2 1.30.1-gke.1329003   6        RUNNING
      
    3. Klonen Sie das Beispielcode-Repository aus GitHub:

      git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
      
    4. Rufen Sie das Verzeichnis databases auf:

      cd kubernetes-engine-samples/databases
      

    Infrastruktur vorbereiten

    Erstellen Sie ein Artifact Registry-Repository, erstellen Sie Docker-Images und übertragen Sie Docker-Images per Push an Artifact Registry:

    1. Erstellen Sie ein Artifact Registry-Repository:

      gcloud artifacts repositories create ${KUBERNETES_CLUSTER_PREFIX}-images \
          --repository-format=docker \
          --location=${REGION} \
          --description="Vector database images repository" \
          --async
      
    2. Weisen Sie dem Compute Engine-Dienstkonto die Berechtigungen storage.objectAdmin und artifactregistry.admin zu, damit Cloud Build zum Erstellen und zur Push-Übertragung von Docker-Images für die embed-docs- und chatbot-Dienste verwendet werden kann.

      export PROJECT_NUMBER=PROJECT_NUMBER
      
      gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
      --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
      --role="roles/storage.objectAdmin"
      
      gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
      --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
      --role="roles/artifactregistry.admin"
      

      Ersetzen Sie PROJECT_NUMBER durch dieGoogle Cloud Projektnummer.

    3. Erstellen Sie Docker-Images für die embed-docs- und chatbot-Dienste. Das Image embed-docs enthält Python-Code sowohl für die Anwendung, die Eventarc-Weiterleitungsanfragen empfängt, als auch für den Einbettungsjob.

      Qdrant

      export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
      gcloud builds submit qdrant/docker/chatbot --region=${REGION} \
        --tag ${DOCKER_REPO}/chatbot:1.0 --async
      gcloud builds submit qdrant/docker/embed-docs --region=${REGION} \
        --tag ${DOCKER_REPO}/embed-docs:1.0 --async
      

      Elasticsearch

      export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
      gcloud builds submit elasticsearch/docker/chatbot --region=${REGION} \
        --tag ${DOCKER_REPO}/chatbot:1.0 --async
      gcloud builds submit elasticsearch/docker/embed-docs --region=${REGION} \
        --tag ${DOCKER_REPO}/embed-docs:1.0 --async
      

      PGVector

      export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
      gcloud builds submit postgres-pgvector/docker/chatbot --region=${REGION} \
        --tag ${DOCKER_REPO}/chatbot:1.0 --async
      gcloud builds submit postgres-pgvector/docker/embed-docs --region=${REGION} \
        --tag ${DOCKER_REPO}/embed-docs:1.0 --async
      

      Weaviate

      export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
      gcloud builds submit weaviate/docker/chatbot --region=${REGION} \
        --tag ${DOCKER_REPO}/chatbot:1.0 --async
      gcloud builds submit weaviate/docker/embed-docs --region=${REGION} \
        --tag ${DOCKER_REPO}/embed-docs:1.0 --async
      
    4. Bilder überprüfen:

      gcloud artifacts docker images list $DOCKER_REPO \
          --project=$PROJECT_ID \
          --format="value(IMAGE)"
      

      Die Ausgabe sieht in etwa so aus:

      $REGION-docker.pkg.dev/$PROJECT_ID/${KUBERNETES_CLUSTER_PREFIX}-images/chatbot
      $REGION-docker.pkg.dev/$PROJECT_ID/${KUBERNETES_CLUSTER_PREFIX}-images/embed-docs
      
    5. Stellen Sie ein Kubernetes-Dienstkonto mit Berechtigungen zum Ausführen von Kubernetes-Jobs bereit:

      Qdrant

      sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" qdrant/manifests/05-rag/service-account.yaml | kubectl -n qdrant apply -f -
      

      Elasticsearch

      sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" elasticsearch/manifests/05-rag/service-account.yaml | kubectl -n elastic apply -f -
      

      PGVector

      sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" postgres-pgvector/manifests/03-rag/service-account.yaml | kubectl -n pg-ns apply -f -
      

      Weaviate

      sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" weaviate/manifests/04-rag/service-account.yaml | kubectl -n weaviate apply -f -
      
    6. Wenn Sie den GKE-Cluster mit Terraform erstellen und create_service_account auf „true“ festgelegt haben, wird ein separates Dienstkonto erstellt und vom Cluster und den Knoten verwendet. Weisen Sie diesem Compute Engine-Dienstkonto die Rolle artifactregistry.serviceAgent zu, damit die Knoten das Image aus der Artifact Registry abrufen können, die für embed-docs und chatbot erstellt wurde.

      export CLUSTER_SERVICE_ACCOUNT=$(gcloud container clusters describe ${KUBERNETES_CLUSTER_PREFIX}-cluster \
      --region=${REGION} \
      --format="value(nodeConfig.serviceAccount)")
      
      gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
      --member="serviceAccount:${CLUSTER_SERVICE_ACCOUNT}" \
      --role="roles/artifactregistry.serviceAgent"
      

      Falls Sie dem Dienstkonto keinen Zugriff gewähren, kann es bei Ihren Knoten zu Berechtigungsproblemen beim Abrufen des Images aus der Artifact Registry kommen, wenn die embed-docs- und chatbot-Dienste bereitgestellt werden.

    7. Stellen Sie ein Kubernetes-Deployment für die embed-docs- und chatbot-Dienste bereit. Ein Deployment ist ein Kubernetes-API-Objekt, mit dem Sie mehrere Replikate von Pods ausführen können, die auf die Knoten in einem Cluster verteilt sind:

      Qdrant

      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" qdrant/manifests/05-rag/chatbot.yaml | kubectl -n qdrant apply -f -
      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" qdrant/manifests/05-rag/docs-embedder.yaml | kubectl -n qdrant apply -f -
      

      Elasticsearch

      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" elasticsearch/manifests/05-rag/chatbot.yaml | kubectl -n elastic apply -f -
      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" elasticsearch/manifests/05-rag/docs-embedder.yaml | kubectl -n elastic apply -f -
      

      PGVector

      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" postgres-pgvector/manifests/03-rag/chatbot.yaml | kubectl -n pg-ns apply -f -
      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" postgres-pgvector/manifests/03-rag/docs-embedder.yaml | kubectl -n pg-ns apply -f -
      

      Weaviate

      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" weaviate/manifests/04-rag/chatbot.yaml | kubectl -n weaviate apply -f -
      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" weaviate/manifests/04-rag/docs-embedder.yaml | kubectl -n weaviate apply -f -
      
    8. Eventarc-Trigger für GKE aktivieren:

      gcloud eventarc gke-destinations init
      

      Geben Sie bei Aufforderung y ein.

    9. Cloud Storage-Bucket bereitstellen und Eventarc-Trigger mit Terraform erstellen:

      export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
      terraform -chdir=vector-database/terraform/cloud-storage init
      terraform -chdir=vector-database/terraform/cloud-storage apply \
        -var project_id=${PROJECT_ID} \
        -var region=${REGION} \
        -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX} \
        -var db_namespace=${DB_NAMESPACE}
      

      Geben Sie bei Aufforderung yes ein. Es kann einige Minuten dauern, bis der Befehl ausgeführt wurde.

      Terraform erstellt die folgenden Ressourcen:

      • Ein Cloud Storage-Bucket zum Hochladen der Dokumente
      • Ein Eventarc-Trigger
      • Ein Google Cloud -Dienstkonto mit dem Namen service_account_eventarc_name mit der Berechtigung zur Verwendung von Eventarc.
      • Ein Google Cloud -Dienstkonto mit dem Namen service_account_bucket_name mit der Berechtigung, den Bucket zu lesen und auf Vertex AI-Modelle zuzugreifen.

      Die Ausgabe sieht etwa so aus:

      ... # Several lines of output omitted
      
      Apply complete! Resources: 15 added, 0 changed, 0 destroyed.
      
      ... # Several lines of output omitted
      

    Dokumente laden und Chatbot-Anfragen ausführen

    Laden Sie die Demodokumente hoch und führen Sie Abfragen aus, um die Demodokumente mit dem Chatbot zu durchsuchen:

    1. Laden Sie das Beispieldokument carbon-free-energy.pdf in den Bucket hoch:

      gcloud storage cp vector-database/documents/carbon-free-energy.pdf gs://${PROJECT_ID}-${KUBERNETES_CLUSTER_PREFIX}-training-docs
      
    2. Prüfen Sie, ob der Job zum Einbetten von Dokumenten erfolgreich abgeschlossen wurde:

      kubectl get job -n ${DB_NAMESPACE}
      

      Die Ausgabe sieht in etwa so aus:

      NAME                            COMPLETIONS   DURATION   AGE
      docs-embedder1716570453361446   1/1           32s        71s
      
    3. Rufen Sie die externe IP-Adresse des Load-Balancers ab:

      export EXTERNAL_IP=$(kubectl -n ${DB_NAMESPACE} get svc chatbot --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
      echo http://${EXTERNAL_IP}:80
      
    4. Öffnen Sie die externe IP-Adresse in Ihrem Webbrowser:

      http://EXTERNAL_IP
      

      Der Chatbot antwortet mit einer Nachricht wie der folgenden:

      How can I help you?
      
    5. Fragen zum Inhalt der hochgeladenen Dokumente stellen Wenn der Chatbot nichts findet, antwortet er mit I don't know. Beispiele:

      You: Hi, what are Google plans for the future?
      

      Eine Beispielausgabe des Chatbots sieht in etwa so aus:

      Bot: Google intends to run on carbon-free energy everywhere, at all times by 2030. To achieve this, it will rely on a combination of renewable energy sources, such as wind and solar, and carbon-free technologies, such as battery storage.
      
    6. Stellen Sie dem Chatbot eine Frage, die sich nicht auf das hochgeladene Dokument bezieht. Sie könnten beispielsweise Folgendes fragen:

      You: What are Google plans to colonize Mars?
      

      Eine Beispielausgabe des Chatbots sieht in etwa so aus:

      Bot: I don't know. The provided context does not mention anything about Google's plans to colonize Mars.
      

    Anwendungscode

    In diesem Abschnitt wird gezeigt, wie der Anwendungscode funktioniert. Es gibt drei Skripts in den Docker-Images:

    • endpoint.py: Empfängt Eventarc-Ereignisse bei jedem Dokumentupload und startet die Kubernetes-Jobs zur Verarbeitung.
    • embedding-job.py: Lädt Dokumente aus dem Bucket herunter, erstellt Einbettungen und fügt sie in die Vektordatenbank ein.
    • chat.py: Führt Abfragen für den Inhalt gespeicherter Dokumente aus.

    Das Diagramm zeigt den Prozess zum Generieren von Antworten mithilfe der Dokumentendaten:

    Im Diagramm wird eine PDF-Datei geladen, in Blöcke und dann in Vektoren aufgeteilt und an eine Vektordatenbank gesendet. Später stellt ein Nutzer dem Chatbot eine Frage. Die RAG-Kette verwendet die semantische Suche, um die Vektordatenbank zu durchsuchen, und gibt dann den Kontext zusammen mit der Frage an das LLM zurück. Das LLM beantwortet die Frage und speichert sie im Chatverlauf.

    Über endpoint.py

    Diese Datei verarbeitet Nachrichten von Eventarc, erstellt einen Kubernetes-Job zum Einbetten des Dokuments und akzeptiert Anfragen von überall über Port 5001.

    Qdrant

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from flask import Flask, jsonify
    from flask import request
    import logging
    import sys,os, time
    from kubernetes import client, config, utils
    import kubernetes.client
    from kubernetes.client.rest import ApiException
    
    
    app = Flask(__name__)
    @app.route('/check')
    def message():
        return jsonify({"Message": "Hi there"})
    
    
    @app.route('/', methods=['POST'])
    def bucket():
        request_data = request.get_json()
        print(request_data)
        bckt = request_data['bucket']
        f_name = request_data['name']
        id = request_data['generation'] 
        kube_create_job(bckt, f_name, id)
        return "ok"
    
    # Set logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Setup K8 configs
    config.load_incluster_config()
    def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="qdrant", container_name="jobcontainer", env_vars={}):
    
        body = client.V1Job(api_version="batch/v1", kind="Job")
        body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
        body.status = client.V1JobStatus()
    
        template = client.V1PodTemplate()
        template.template = client.V1PodTemplateSpec()
        env_list = [
            client.V1EnvVar(name="QDRANT_URL", value=os.getenv("QDRANT_URL")),
            client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
            client.V1EnvVar(name="FILE_NAME", value=f_name), 
            client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
            client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="api-key", name="qdrant-database-apikey"))), 
        ]
    
        container = client.V1Container(name=container_name, image=container_image, env=env_list)
        template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
    
        body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
        return body
    def kube_test_credentials():
        try: 
            api_response = api_instance.get_api_resources()
            logging.info(api_response)
        except ApiException as e:
            print("Exception when calling API: %s\n" % e)
    
    def kube_create_job(bckt, f_name, id):
        container_image = os.getenv("JOB_IMAGE")
        namespace = os.getenv("JOB_NAMESPACE")
        name = "docs-embedder" + id
        body = kube_create_job_object(name, container_image, bckt, f_name)
        v1=client.BatchV1Api()
        try: 
            v1.create_namespaced_job(namespace, body, pretty=True)
        except ApiException as e:
            print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
        return
    
    if __name__ == '__main__':
        app.run('0.0.0.0', port=5001, debug=True)
    

    Elasticsearch

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from flask import Flask, jsonify
    from flask import request
    import logging
    import sys,os, time
    from kubernetes import client, config, utils
    import kubernetes.client
    from kubernetes.client.rest import ApiException
    
    
    app = Flask(__name__)
    @app.route('/check')
    def message():
        return jsonify({"Message": "Hi there"})
    
    
    @app.route('/', methods=['POST'])
    def bucket():
        request_data = request.get_json()
        print(request_data)
        bckt = request_data['bucket']
        f_name = request_data['name']
        id = request_data['generation'] 
        kube_create_job(bckt, f_name, id)
        return "ok"
    
    # Set logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Setup K8 configs
    config.load_incluster_config()
    
    def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="elastic", container_name="jobcontainer", env_vars={}):
    
        body = client.V1Job(api_version="batch/v1", kind="Job")
        body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
        body.status = client.V1JobStatus()
    
        template = client.V1PodTemplate()
        template.template = client.V1PodTemplateSpec()
        env_list = [
            client.V1EnvVar(name="ES_URL", value=os.getenv("ES_URL")),
            client.V1EnvVar(name="INDEX_NAME", value="training-docs"), 
            client.V1EnvVar(name="FILE_NAME", value=f_name), 
            client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
            client.V1EnvVar(name="PASSWORD", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="elastic", name="elasticsearch-ha-es-elastic-user"))), 
        ]
    
        container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
        template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
    
        body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
        return body
    
    def kube_test_credentials():
        try: 
            api_response = api_instance.get_api_resources()
            logging.info(api_response)
        except ApiException as e:
            print("Exception when calling API: %s\n" % e)
    
    def kube_create_job(bckt, f_name, id):
        container_image = os.getenv("JOB_IMAGE")
        namespace = os.getenv("JOB_NAMESPACE")
        name = "docs-embedder" + id
        body = kube_create_job_object(name, container_image, bckt, f_name)
        v1=client.BatchV1Api()
        try: 
            v1.create_namespaced_job(namespace, body, pretty=True)
        except ApiException as e:
            print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
        return
    
    if __name__ == '__main__':
        app.run('0.0.0.0', port=5001, debug=True)
    

    PGVector

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from flask import Flask, jsonify
    from flask import request
    import logging
    import sys,os, time
    from kubernetes import client, config, utils
    import kubernetes.client
    from kubernetes.client.rest import ApiException
    
    
    app = Flask(__name__)
    @app.route('/check')
    def message():
        return jsonify({"Message": "Hi there"})
    
    
    @app.route('/', methods=['POST'])
    def bucket():
        request_data = request.get_json()
        print(request_data)
        bckt = request_data['bucket']
        f_name = request_data['name']
        id = request_data['generation'] 
        kube_create_job(bckt, f_name, id)
        return "ok"
    
    # Set logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Setup K8 configs
    config.load_incluster_config()
    def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="pg-ns", container_name="jobcontainer", env_vars={}):
    
        body = client.V1Job(api_version="batch/v1", kind="Job")
        body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
        body.status = client.V1JobStatus()
    
        template = client.V1PodTemplate()
        template.template = client.V1PodTemplateSpec()
        env_list = [
            client.V1EnvVar(name="POSTGRES_HOST", value=os.getenv("POSTGRES_HOST")),
            client.V1EnvVar(name="DATABASE_NAME", value="app"), 
            client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
            client.V1EnvVar(name="FILE_NAME", value=f_name), 
            client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
            client.V1EnvVar(name="PASSWORD", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="password", name="gke-pg-cluster-app"))), 
            client.V1EnvVar(name="USERNAME", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="username", name="gke-pg-cluster-app"))), 
        ]
    
        container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
        template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
    
        body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
        return body
    def kube_test_credentials():
        try: 
            api_response = api_instance.get_api_resources()
            logging.info(api_response)
        except ApiException as e:
            print("Exception when calling API: %s\n" % e)
    
    def kube_create_job(bckt, f_name, id):
        container_image = os.getenv("JOB_IMAGE")
        namespace = os.getenv("JOB_NAMESPACE")
        name = "docs-embedder" + id
        body = kube_create_job_object(name, container_image, bckt, f_name)
        v1=client.BatchV1Api()
        try: 
            v1.create_namespaced_job(namespace, body, pretty=True)
        except ApiException as e:
            print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
        return
    
    if __name__ == '__main__':
        app.run('0.0.0.0', port=5001, debug=True)
    

    Weaviate

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from flask import Flask, jsonify
    from flask import request
    import logging
    import sys,os, time
    from kubernetes import client, config, utils
    import kubernetes.client
    from kubernetes.client.rest import ApiException
    
    
    app = Flask(__name__)
    @app.route('/check')
    def message():
        return jsonify({"Message": "Hi there"})
    
    
    @app.route('/', methods=['POST'])
    def bucket():
        request_data = request.get_json()
        print(request_data)
        bckt = request_data['bucket']
        f_name = request_data['name']
        id = request_data['generation'] 
        kube_create_job(bckt, f_name, id)
        return "ok"
    
    # Set logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Setup K8 configs
    config.load_incluster_config()
    def kube_create_job_object(name, container_image, bucket_name, f_name, namespace, container_name="jobcontainer", env_vars={}):
    
        body = client.V1Job(api_version="batch/v1", kind="Job")
        body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
        body.status = client.V1JobStatus()
    
        template = client.V1PodTemplate()
        template.template = client.V1PodTemplateSpec()
        env_list = [
            client.V1EnvVar(name="WEAVIATE_ENDPOINT", value=os.getenv("WEAVIATE_ENDPOINT")),
            client.V1EnvVar(name="WEAVIATE_GRPC_ENDPOINT", value=os.getenv("WEAVIATE_GRPC_ENDPOINT")),
            client.V1EnvVar(name="FILE_NAME", value=f_name), 
            client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
            client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="AUTHENTICATION_APIKEY_ALLOWED_KEYS", name="apikeys"))), 
        ]
    
        container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
        template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
    
        body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
        return body
    def kube_test_credentials():
        try: 
            api_response = api_instance.get_api_resources()
            logging.info(api_response)
        except ApiException as e:
            print("Exception when calling API: %s\n" % e)
    
    def kube_create_job(bckt, f_name, id):
        container_image = os.getenv("JOB_IMAGE")
        namespace = os.getenv("JOB_NAMESPACE")
        name = "docs-embedder" + id
        body = kube_create_job_object(name, container_image, bckt, f_name, namespace)
        v1=client.BatchV1Api()
        try: 
            v1.create_namespaced_job(namespace, body, pretty=True)
        except ApiException as e:
            print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
        return
    
    if __name__ == '__main__':
        app.run('0.0.0.0', port=5001, debug=True)
    

    Über embedding-job.py

    Diese Datei verarbeitet Dokumente und sendet sie an die Vektordatenbank.

    Qdrant

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import ChatVertexAI
    from langchain.prompts import ChatPromptTemplate
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain.memory import ConversationBufferWindowMemory
    from langchain_community.vectorstores import Qdrant
    from qdrant_client import QdrantClient
    import streamlit as st
    import os
    
    vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
    prompt_template = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
            ("human", """
            The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
            Base your response on the provided text context and the current conversation history to answer the query.
            Select the most relevant information from the context.
            Generate a draft response using the selected information. Remove duplicate content from the draft response.
            Generate your final response after adjusting it to increase accuracy and relevance.
            Now only show your final response!
            If you do not know the answer or context is not relevant, response with "I don't know".
    
            text_context:
            {context}
    
            conversation_history:
            {history}
    
            query:
            {query}
            """),
        ]
    )
    
    embedding_model = VertexAIEmbeddings("text-embedding-005")
    
    client = QdrantClient(
        url=os.getenv("QDRANT_URL"),
        api_key=os.getenv("APIKEY"),
    )
    collection_name = os.getenv("COLLECTION_NAME")
    vector_search = Qdrant(client, collection_name, embeddings=embedding_model)
    def format_docs(docs):
        return "\n\n".join([d.page_content for d in docs])
    
    st.title("🤖 Chatbot")
    if "messages" not in st.session_state:
        st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
    if "memory" not in st.session_state:
        st.session_state["memory"] = ConversationBufferWindowMemory(
            memory_key="history",
            ai_prefix="Bob",
            human_prefix="User",
            k=3,
        )
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.write(message["content"])
    if chat_input := st.chat_input():
        with st.chat_message("human"):
            st.write(chat_input)
            st.session_state.messages.append({"role": "human", "content": chat_input})
    
        found_docs = vector_search.similarity_search(chat_input)
        context = format_docs(found_docs)
    
        prompt_value = prompt_template.format_messages(name="Bob", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
        with st.chat_message("ai"):
            with st.spinner("Typing..."):
                content = ""
                with st.empty():
                    for chunk in vertexAI.stream(prompt_value):
                        content += chunk.content
                        st.write(content)
                st.session_state.messages.append({"role": "ai", "content": content})
    
        st.session_state.memory.save_context({"input": chat_input}, {"output": content})
    
    

    Elasticsearch

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.document_loaders import PyPDFLoader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from elasticsearch import Elasticsearch
    from langchain_community.vectorstores.elasticsearch import ElasticsearchStore
    from google.cloud import storage
    import os
    
    bucketname = os.getenv("BUCKET_NAME")
    filename = os.getenv("FILE_NAME")
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucketname)
    blob = bucket.blob(filename)
    blob.download_to_filename("/documents/" + filename)
    
    loader = PyPDFLoader("/documents/" + filename)
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
    documents = loader.load_and_split(text_splitter)
    
    embeddings = VertexAIEmbeddings("text-embedding-005")
    
    client = Elasticsearch(
        [os.getenv("ES_URL")], 
        verify_certs=False, 
        ssl_show_warn=False,
        basic_auth=("elastic", os.getenv("PASSWORD"))
    )
    
    db = ElasticsearchStore.from_documents(
        documents,
        embeddings,
        es_connection=client,
        index_name=os.getenv("INDEX_NAME")
    )
    db.client.indices.refresh(index=os.getenv("INDEX_NAME"))
    
    print(filename + " was successfully embedded") 
    print(f"# of vectors = {len(documents)}")
    
    

    PGVector

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.document_loaders import PyPDFLoader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain_community.vectorstores.pgvector import PGVector
    from google.cloud import storage
    import os
    bucketname = os.getenv("BUCKET_NAME")
    filename = os.getenv("FILE_NAME")
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucketname)
    blob = bucket.blob(filename)
    blob.download_to_filename("/documents/" + filename)
    
    loader = PyPDFLoader("/documents/" + filename)
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
    documents = loader.load_and_split(text_splitter)
    for document in documents:
        document.page_content = document.page_content.replace('\x00', '')
    
    embeddings = VertexAIEmbeddings("text-embedding-005")
    
    CONNECTION_STRING = PGVector.connection_string_from_db_params(
        driver="psycopg2",
        host=os.environ.get("POSTGRES_HOST"),
        port=5432,
        database=os.environ.get("DATABASE_NAME"),
        user=os.environ.get("USERNAME"),
        password=os.environ.get("PASSWORD"),
    )
    COLLECTION_NAME = os.environ.get("COLLECTION_NAME")
    
    db = PGVector.from_documents(
        embedding=embeddings,
        documents=documents,
        collection_name=COLLECTION_NAME,
        connection_string=CONNECTION_STRING,
        use_jsonb=True
    )
    
    print(filename + " was successfully embedded") 
    print(f"# of vectors = {len(documents)}")
    
    

    Weaviate

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.document_loaders import PyPDFLoader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    import weaviate
    from weaviate.connect import ConnectionParams
    from langchain_weaviate.vectorstores import WeaviateVectorStore
    from google.cloud import storage
    import os
    bucketname = os.getenv("BUCKET_NAME")
    filename = os.getenv("FILE_NAME")
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucketname)
    blob = bucket.blob(filename)
    blob.download_to_filename("/documents/" + filename)
    
    loader = PyPDFLoader("/documents/" + filename)
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
    documents = loader.load_and_split(text_splitter)
    
    embeddings = VertexAIEmbeddings("text-embedding-005")
    
    auth_config = weaviate.auth.AuthApiKey(api_key=os.getenv("APIKEY"))
    client = weaviate.WeaviateClient(
        connection_params=ConnectionParams.from_params(
            http_host=os.getenv("WEAVIATE_ENDPOINT"),
            http_port="80",
            http_secure=False,
            grpc_host=os.getenv("WEAVIATE_GRPC_ENDPOINT"),
            grpc_port="50051",
            grpc_secure=False,
        ),
        auth_client_secret=auth_config
    )
    client.connect()
    if not client.collections.exists("trainingdocs"):
        collection = client.collections.create(name="trainingdocs")
    db = WeaviateVectorStore.from_documents(documents, embeddings, client=client, index_name="trainingdocs")
    
    print(filename + " was successfully embedded") 
    print(f"# of vectors = {len(documents)}")
    
    

    Über chat.py

    In dieser Datei wird das Modell so konfiguriert, dass es Fragen nur anhand des bereitgestellten Kontexts und der vorherigen Antworten beantwortet. Wenn der Kontext oder der Unterhaltungsverlauf mit keinen Daten übereinstimmt, gibt das Modell I don't know zurück.

    Qdrant

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from flask import Flask, jsonify
    from flask import request
    import logging
    import sys,os, time
    from kubernetes import client, config, utils
    import kubernetes.client
    from kubernetes.client.rest import ApiException
    
    
    app = Flask(__name__)
    @app.route('/check')
    def message():
        return jsonify({"Message": "Hi there"})
    
    
    @app.route('/', methods=['POST'])
    def bucket():
        request_data = request.get_json()
        print(request_data)
        bckt = request_data['bucket']
        f_name = request_data['name']
        id = request_data['generation'] 
        kube_create_job(bckt, f_name, id)
        return "ok"
    
    # Set logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Setup K8 configs
    config.load_incluster_config()
    def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="qdrant", container_name="jobcontainer", env_vars={}):
    
        body = client.V1Job(api_version="batch/v1", kind="Job")
        body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
        body.status = client.V1JobStatus()
    
        template = client.V1PodTemplate()
        template.template = client.V1PodTemplateSpec()
        env_list = [
            client.V1EnvVar(name="QDRANT_URL", value=os.getenv("QDRANT_URL")),
            client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
            client.V1EnvVar(name="FILE_NAME", value=f_name), 
            client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
            client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="api-key", name="qdrant-database-apikey"))), 
        ]
    
        container = client.V1Container(name=container_name, image=container_image, env=env_list)
        template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
    
        body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
        return body
    def kube_test_credentials():
        try: 
            api_response = api_instance.get_api_resources()
            logging.info(api_response)
        except ApiException as e:
            print("Exception when calling API: %s\n" % e)
    
    def kube_create_job(bckt, f_name, id):
        container_image = os.getenv("JOB_IMAGE")
        namespace = os.getenv("JOB_NAMESPACE")
        name = "docs-embedder" + id
        body = kube_create_job_object(name, container_image, bckt, f_name)
        v1=client.BatchV1Api()
        try: 
            v1.create_namespaced_job(namespace, body, pretty=True)
        except ApiException as e:
            print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
        return
    
    if __name__ == '__main__':
        app.run('0.0.0.0', port=5001, debug=True)
    

    Elasticsearch

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import ChatVertexAI
    from langchain.prompts import ChatPromptTemplate
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain.memory import ConversationBufferWindowMemory
    from elasticsearch import Elasticsearch
    from langchain_community.vectorstores.elasticsearch import ElasticsearchStore
    import streamlit as st
    import os
    
    vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
    prompt_template = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
            ("human", """
            The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
            Base your response on the provided text context and the current conversation history to answer the query.
            Select the most relevant information from the context.
            Generate a draft response using the selected information. Remove duplicate content from the draft response.
            Generate your final response after adjusting it to increase accuracy and relevance.
            Now only show your final response!
            If you do not know the answer or context is not relevant, response with "I don't know".
    
            text_context:
            {context}
    
            conversation_history:
            {history}
    
            query:
            {query}
            """),
        ]
    )
    
    embedding_model = VertexAIEmbeddings("text-embedding-005")
    
    client = Elasticsearch(
        [os.getenv("ES_URL")], 
        verify_certs=False, 
        ssl_show_warn=False,
        basic_auth=("elastic", os.getenv("PASSWORD"))
    )
    vector_search = ElasticsearchStore(
        index_name=os.getenv("INDEX_NAME"),
        es_connection=client,
        embedding=embedding_model
    )
    
    def format_docs(docs):
        return "\n\n".join([d.page_content for d in docs])
    
    st.title("🤖 Chatbot")
    if "messages" not in st.session_state:
        st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
    
    if "memory" not in st.session_state:
        st.session_state["memory"] = ConversationBufferWindowMemory(
            memory_key="history",
            ai_prefix="Bot",
            human_prefix="User",
            k=3,
        )
    
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.write(message["content"])
    
    if chat_input := st.chat_input():
        with st.chat_message("human"):
            st.write(chat_input)
            st.session_state.messages.append({"role": "human", "content": chat_input})
    
        found_docs = vector_search.similarity_search(chat_input)
        context = format_docs(found_docs)
    
        prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
        with st.chat_message("ai"):
            with st.spinner("Typing..."):
                content = ""
                with st.empty():
                    for chunk in vertexAI.stream(prompt_value):
                        content += chunk.content
                        st.write(content)
                st.session_state.messages.append({"role": "ai", "content": content})
    
        st.session_state.memory.save_context({"input": chat_input}, {"output": content})
    
    

    PGVector

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import ChatVertexAI
    from langchain.prompts import ChatPromptTemplate
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain.memory import ConversationBufferWindowMemory
    from langchain_community.vectorstores.pgvector import PGVector
    import streamlit as st
    import os
    
    vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
    prompt_template = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
            ("human", """
            The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
            Base your response on the provided text context and the current conversation history to answer the query.
            Select the most relevant information from the context.
            Generate a draft response using the selected information. Remove duplicate content from the draft response.
            Generate your final response after adjusting it to increase accuracy and relevance.
            Now only show your final response!
            If you do not know the answer or context is not relevant, response with "I don't know".
    
            text_context:
            {context}
    
            conversation_history:
            {history}
    
            query:
            {query}
            """),
        ]
    )
    
    embedding_model = VertexAIEmbeddings("text-embedding-005")
    
    CONNECTION_STRING = PGVector.connection_string_from_db_params(
        driver="psycopg2",
        host=os.environ.get("POSTGRES_HOST"),
        port=5432,
        database=os.environ.get("DATABASE_NAME"),
        user=os.environ.get("USERNAME"),
        password=os.environ.get("PASSWORD"),
    )
    COLLECTION_NAME = os.environ.get("COLLECTION_NAME"),
    
    vector_search = PGVector(
        collection_name=COLLECTION_NAME,
        connection_string=CONNECTION_STRING,
        embedding_function=embedding_model,
    )
    
    def format_docs(docs):
        return "\n\n".join([d.page_content for d in docs])
    
    st.title("🤖 Chatbot")
    if "messages" not in st.session_state:
        st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
    
    if "memory" not in st.session_state:
        st.session_state["memory"] = ConversationBufferWindowMemory(
            memory_key="history",
            ai_prefix="Bot",
            human_prefix="User",
            k=3,
        )
    
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.write(message["content"])
    
    if chat_input := st.chat_input():
        with st.chat_message("human"):
            st.write(chat_input)
            st.session_state.messages.append({"role": "human", "content": chat_input})
    
        found_docs = vector_search.similarity_search(chat_input)
        context = format_docs(found_docs)
    
        prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
        with st.chat_message("ai"):
            with st.spinner("Typing..."):
                content = ""
                with st.empty():
                    for chunk in vertexAI.stream(prompt_value):
                        content += chunk.content
                        st.write(content)
                st.session_state.messages.append({"role": "ai", "content": content})
    
        st.session_state.memory.save_context({"input": chat_input}, {"output": content})
    
    

    Weaviate

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import ChatVertexAI
    from langchain.prompts import ChatPromptTemplate
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain.memory import ConversationBufferWindowMemory
    import weaviate
    from weaviate.connect import ConnectionParams
    from langchain_weaviate.vectorstores import WeaviateVectorStore
    import streamlit as st
    import os
    
    vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
    prompt_template = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
            ("human", """
            The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
            Base your response on the provided text context and the current conversation history to answer the query.
            Select the most relevant information from the context.
            Generate a draft response using the selected information. Remove duplicate content from the draft response.
            Generate your final response after adjusting it to increase accuracy and relevance.
            Now only show your final response!
            If you do not know the answer or context is not relevant, response with "I don't know".
    
            text_context:
            {context}
    
            conversation_history:
            {history}
    
            query:
            {query}
            """),
        ]
    )
    
    embedding_model = VertexAIEmbeddings("text-embedding-005")
    
    auth_config = weaviate.auth.AuthApiKey(api_key=os.getenv("APIKEY"))
    client = weaviate.WeaviateClient(
        connection_params=ConnectionParams.from_params(
            http_host=os.getenv("WEAVIATE_ENDPOINT"),
            http_port="80",
            http_secure=False,
            grpc_host=os.getenv("WEAVIATE_GRPC_ENDPOINT"),
            grpc_port="50051",
            grpc_secure=False,
        ),
        auth_client_secret=auth_config
    )
    client.connect()
    
    vector_search = WeaviateVectorStore.from_documents([],embedding_model,client=client, index_name="trainingdocs")
    
    def format_docs(docs):
        return "\n\n".join([d.page_content for d in docs])
    
    st.title("🤖 Chatbot")
    if "messages" not in st.session_state:
        st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
    
    if "memory" not in st.session_state:
        st.session_state["memory"] = ConversationBufferWindowMemory(
            memory_key="history",
            ai_prefix="Bot",
            human_prefix="User",
            k=3,
        )
    
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.write(message["content"])
    
    if chat_input := st.chat_input():
        with st.chat_message("human"):
            st.write(chat_input)
            st.session_state.messages.append({"role": "human", "content": chat_input})
    
        found_docs = vector_search.similarity_search(chat_input)
        context = format_docs(found_docs)
    
        prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
        with st.chat_message("ai"):
            with st.spinner("Typing..."):
                content = ""
                with st.empty():
                    for chunk in vertexAI.stream(prompt_value):
                        content += chunk.content
                        st.write(content)
                st.session_state.messages.append({"role": "ai", "content": content})
    
        st.session_state.memory.save_context({"input": chat_input}, {"output": content})
    
    

    Bereinigen

    Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.

    Projekt löschen

    Sie vermeiden weitere Kosten am einfachsten, wenn Sie das für die Anleitung erstellte Projekt löschen.

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

    Wenn Sie das Projekt gelöscht haben, ist die Bereinigung abgeschlossen. Wenn Sie das Projekt nicht gelöscht haben, fahren Sie mit dem Löschen der einzelnen Ressourcen fort.

    Einzelne Ressourcen löschen

    1. Löschen Sie das Artifact Registry-Repository:

      gcloud artifacts repositories delete ${KUBERNETES_CLUSTER_PREFIX}-images \
          --location=${REGION} \
          --async
      

      Geben Sie bei Aufforderung y ein.

    2. Löschen Sie den Cloud Storage-Bucket und den Eventarc-Trigger:

      export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
      terraform -chdir=vector-database/terraform/cloud-storage destroy \
        -var project_id=${PROJECT_ID} \
        -var region=${REGION} \
        -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX} \
        -var db_namespace=${DB_NAMESPACE}
      

      Geben Sie bei Aufforderung yes ein.

      Für Eventarc ist ein gültiges Endpunktziel sowohl beim Erstellen als auch beim Löschen erforderlich.

    Nächste Schritte