GKE と Cloud Storage を使用して RAG chatbot を構築する


このチュートリアルでは、検索拡張生成(RAG)に基づく大規模言語モデル(LLM)アプリケーションと、Cloud Storage バケットにアップロードされた PDF ファイルを統合する方法について説明します。

このガイドでは、アップロードされたドキュメントの表現(エンベディング)を保持するストレージとセマンティック検索エンジンとしてデータベースを使用します。Langchain フレームワークを使用してエンベディングを操作し、Vertex AI で利用可能な Gemini モデルを使用します。

Langchain は、多くの ML タスクを簡素化し、さまざまなベクトル データベースや AI サービスと統合するためのインターフェースを備えた、よく利用されているオープンソースの Python フレームワークです。

このチュートリアルは、GKE と Cloud Storage に RAG LLM アプリケーションをデプロイすることに関心があるクラウド プラットフォームの管理者とアーキテクトML エンジニア、MLOps(DevOps)の専門家を対象としています。

目標

このチュートリアルでは、以下の方法について学習します。

  • ドキュメント エンベディングを作成してベクトル データベースに保存するアプリケーションのビルドとデプロイを行う。
  • アプリケーションを自動化し、Cloud Storage バケットへの新規ドキュメントのアップロードをトリガーする。
  • セマンティック検索を利用してドキュメントのコンテンツに基づいて質問に答える chatbot アプリケーションをデプロイする。

Deployment のアーキテクチャ

このチュートリアルでは、Cloud Storage バケット、Eventarc トリガー、次の Service を作成します。

  • embed-docs: ユーザーが Cloud Storage バケットに新しいドキュメントをアップロードするたびに、Eventarc はこの Service をトリガーします。この Service は Kubernetes Job を開始して、アップロードされたドキュメントのエンベディングを作成し、ベクトル データベースに格納します。
  • chatbot: この Service は、セマンティック検索と Gemini API を使用して、アップロードされたドキュメントに関する自然言語の質問に回答します。

次の図は、ドキュメントのアップロードとベクトル化のプロセスを示しています。

この図では、ユーザーが Cloud Storage バケットにファイルをアップロードしています。Eventarc は、バケットのオブジェクト metadataUpdated イベントをサブスクライブしています。ユーザーが新しいドキュメントをアップロードすると、Eventarc は Eventarc のイベント フォワーダー(Kubernetes ワークロード)を使用して embed-docs Service を呼び出します。Service が、アップロードされたドキュメントのエンベディングを作成します。embed-docs Service は、Vertex AI エンベディング モデルを使用して、エンベディングをベクトル データベースに保存します。

次の図は、chatbot Service を使用して、アップロードされたドキュメント コンテンツについて質問するプロセスを示しています。

ユーザーは自然言語を使用して質問できます。chatbot は、アップロードされたファイルのコンテンツのみに従って回答を生成します。chatbot は、セマンティック検索を使用してベクトル データベースからコンテキストを取得し、質問とコンテキストを Gemini に送信します。

費用

このドキュメントでは、課金対象である次の Google Cloudコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。

新規の Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

このチュートリアルでは、Cloud Shell を使用してコマンドを実行します。Cloud Shell は、 Google Cloudでホストされているリソースを管理するためのシェル環境です。Cloud Shell には、Google Cloud CLIkubectlTerraform のコマンドライン ツールがプリインストールされています。Cloud Shell を使用しない場合は、Google Cloud CLI をインストールします。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. 外部 ID プロバイダ(IdP)を使用している場合は、まずフェデレーション ID を使用して gcloud CLI にログインする必要があります。

  4. gcloud CLI を初期化するには、次のコマンドを実行します。

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Vertex AI, Cloud Build, Eventarc, Artifact Registry APIs:

    gcloud services enable aiplatform.googleapis.com cloudbuild.googleapis.com eventarc.googleapis.com artifactregistry.googleapis.com
  8. Install the Google Cloud CLI.

  9. 外部 ID プロバイダ(IdP)を使用している場合は、まずフェデレーション ID を使用して gcloud CLI にログインする必要があります。

  10. gcloud CLI を初期化するには、次のコマンドを実行します。

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Vertex AI, Cloud Build, Eventarc, Artifact Registry APIs:

    gcloud services enable aiplatform.googleapis.com cloudbuild.googleapis.com eventarc.googleapis.com artifactregistry.googleapis.com
  14. Grant roles to your user account. Run the following command once for each of the following IAM roles: eventarc.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  15. クラスタを作成する

    Qdrant、Elasticsearch、または Postgres クラスタを作成します。

    Qdrant

    GKE に Qdrant ベクトル データベースをデプロイするの手順に沿って、Autopilot モードまたは Standard モードの GKE クラスタで実行される Qdrant クラスタを作成します。

    Elasticsearch

    GKE に Elasticsearch ベクトル データベースをデプロイするの手順に沿って、Autopilot モードまたは Standard モードの GKE クラスタで実行される Elasticsearch クラスタを作成します。

    PGVector

    GKE に PostgreSQL ベクトル データベースをデプロイするの手順に沿って、Autopilot モードまたは Standard モードの GKE クラスタで PGVector を実行する Postgres クラスタを作成します。

    Weaviate

    GKE に Weaviate ベクトル データベースをデプロイするの手順に沿って、Autopilot モードまたは Standard モードの GKE クラスタで実行される Weaviate クラスタを作成します。

    環境を設定する

    Cloud Shell を使用して環境を設定します。

    1. プロジェクトの環境変数を設定します。

      Qdrant

      export PROJECT_ID=PROJECT_ID
      export KUBERNETES_CLUSTER_PREFIX=qdrant
      export REGION=us-central1
      export DB_NAMESPACE=qdrant
      

      PROJECT_ID は、実際のGoogle Cloud プロジェクト ID に置き換えます。

      Elasticsearch

      export PROJECT_ID=PROJECT_ID
      export KUBERNETES_CLUSTER_PREFIX=elasticsearch
      export REGION=us-central1
      export DB_NAMESPACE=elastic
      

      PROJECT_ID は、実際のGoogle Cloud プロジェクト ID に置き換えます。

      PGVector

      export PROJECT_ID=PROJECT_ID
      export KUBERNETES_CLUSTER_PREFIX=postgres
      export REGION=us-central1
      export DB_NAMESPACE=pg-ns
      

      PROJECT_ID は、実際のGoogle Cloud プロジェクト ID に置き換えます。

      Weaviate

      export PROJECT_ID=PROJECT_ID
      export KUBERNETES_CLUSTER_PREFIX=weaviate
      export REGION=us-central1
      export DB_NAMESPACE=weaviate
      

      PROJECT_ID は、実際のGoogle Cloud プロジェクト ID に置き換えます。

    2. GKE クラスタが実行されていることを確認します。

      gcloud container clusters list --project=${PROJECT_ID} --region=${REGION}
      

      出力は次のようになります。

      NAME                                    LOCATION        MASTER_VERSION      MASTER_IP     MACHINE_TYPE  NODE_VERSION        NUM_NODES STATUS
      [KUBERNETES_CLUSTER_PREFIX]-cluster   us-central1   1.30.1-gke.1329003  <EXTERNAL IP> e2-standard-2 1.30.1-gke.1329003   6        RUNNING
      
    3. GitHub からサンプルコード リポジトリのクローンを作成します。

      git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
      
    4. databases ディレクトリに移動します。

      cd kubernetes-engine-samples/databases
      

    インフラストラクチャを準備する

    Artifact Registry リポジトリを作成し、Docker イメージをビルドして、Artifact Registry に push します。

    1. Artifact Registry リポジトリを作成します。

      gcloud artifacts repositories create ${KUBERNETES_CLUSTER_PREFIX}-images \
          --repository-format=docker \
          --location=${REGION} \
          --description="Vector database images repository" \
          --async
      
    2. Cloud Build を使用して embed-docs Service と chatbot Service の Docker イメージのビルドと push ができるように、Compute Engine サービス アカウントに storage.objectAdmin 権限と artifactregistry.admin 権限を設定します。

      export PROJECT_NUMBER=PROJECT_NUMBER
      
      gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
      --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
      --role="roles/storage.objectAdmin"
      
      gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
      --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
      --role="roles/artifactregistry.admin"
      

      PROJECT_NUMBER は、使用するGoogle Cloud プロジェクト番号に置き換えます。

    3. embed-docs Service と chatbot Service の Docker イメージをビルドします。embed-docs イメージには、Eventarc 転送リクエストを受信するアプリケーションとエンベディング ジョブの両方の Python コードが含まれています。

      Qdrant

      export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
      gcloud builds submit qdrant/docker/chatbot --region=${REGION} \
        --tag ${DOCKER_REPO}/chatbot:1.0 --async
      gcloud builds submit qdrant/docker/embed-docs --region=${REGION} \
        --tag ${DOCKER_REPO}/embed-docs:1.0 --async
      

      Elasticsearch

      export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
      gcloud builds submit elasticsearch/docker/chatbot --region=${REGION} \
        --tag ${DOCKER_REPO}/chatbot:1.0 --async
      gcloud builds submit elasticsearch/docker/embed-docs --region=${REGION} \
        --tag ${DOCKER_REPO}/embed-docs:1.0 --async
      

      PGVector

      export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
      gcloud builds submit postgres-pgvector/docker/chatbot --region=${REGION} \
        --tag ${DOCKER_REPO}/chatbot:1.0 --async
      gcloud builds submit postgres-pgvector/docker/embed-docs --region=${REGION} \
        --tag ${DOCKER_REPO}/embed-docs:1.0 --async
      

      Weaviate

      export DOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images"
      gcloud builds submit weaviate/docker/chatbot --region=${REGION} \
        --tag ${DOCKER_REPO}/chatbot:1.0 --async
      gcloud builds submit weaviate/docker/embed-docs --region=${REGION} \
        --tag ${DOCKER_REPO}/embed-docs:1.0 --async
      
    4. イメージを確認します。

      gcloud artifacts docker images list $DOCKER_REPO \
          --project=$PROJECT_ID \
          --format="value(IMAGE)"
      

      出力は次のようになります。

      $REGION-docker.pkg.dev/$PROJECT_ID/${KUBERNETES_CLUSTER_PREFIX}-images/chatbot
      $REGION-docker.pkg.dev/$PROJECT_ID/${KUBERNETES_CLUSTER_PREFIX}-images/embed-docs
      
    5. Kubernetes Job の実行権限を持つ Kubernetes サービス アカウントをデプロイします。

      Qdrant

      sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" qdrant/manifests/05-rag/service-account.yaml | kubectl -n qdrant apply -f -
      

      Elasticsearch

      sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" elasticsearch/manifests/05-rag/service-account.yaml | kubectl -n elastic apply -f -
      

      PGVector

      sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" postgres-pgvector/manifests/03-rag/service-account.yaml | kubectl -n pg-ns apply -f -
      

      Weaviate

      sed "s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/" weaviate/manifests/04-rag/service-account.yaml | kubectl -n weaviate apply -f -
      
    6. Terraform を使用して GKE クラスタを作成し、create_service_account を true に設定すると、別のサービス アカウントが作成され、クラスタとノードで使用されます。この Compute Engine サービス アカウントに artifactregistry.serviceAgent ロールを付与して、ノードが embed-docschatbot 用に作成された Artifact Registry からイメージを pull できるようにします。

      export CLUSTER_SERVICE_ACCOUNT=$(gcloud container clusters describe ${KUBERNETES_CLUSTER_PREFIX}-cluster \
      --region=${REGION} \
      --format="value(nodeConfig.serviceAccount)")
      
      gcloud projects add-iam-policy-binding ${PROJECT_ID}  \
      --member="serviceAccount:${CLUSTER_SERVICE_ACCOUNT}" \
      --role="roles/artifactregistry.serviceAgent"
      

      サービス アカウントへのアクセス権を付与しないと、embed-docs サービスと chatbot サービスのデプロイ時に Artifact Registry からイメージを pull しようとすると、ノードに権限の問題が発生する可能性があります。

    7. embed-docs Service と chatbot Service の Kubernetes Deployment をデプロイします。Deployment は、クラスタ内のノードに分散された Pod の複数のレプリカを実行できる Kubernetes API オブジェクトです。

      Qdrant

      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" qdrant/manifests/05-rag/chatbot.yaml | kubectl -n qdrant apply -f -
      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" qdrant/manifests/05-rag/docs-embedder.yaml | kubectl -n qdrant apply -f -
      

      Elasticsearch

      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" elasticsearch/manifests/05-rag/chatbot.yaml | kubectl -n elastic apply -f -
      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" elasticsearch/manifests/05-rag/docs-embedder.yaml | kubectl -n elastic apply -f -
      

      PGVector

      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" postgres-pgvector/manifests/03-rag/chatbot.yaml | kubectl -n pg-ns apply -f -
      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" postgres-pgvector/manifests/03-rag/docs-embedder.yaml | kubectl -n pg-ns apply -f -
      

      Weaviate

      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" weaviate/manifests/04-rag/chatbot.yaml | kubectl -n weaviate apply -f -
      sed "s|<DOCKER_REPO>|$DOCKER_REPO|" weaviate/manifests/04-rag/docs-embedder.yaml | kubectl -n weaviate apply -f -
      
    8. GKE の Eventarc トリガーを有効にします。

      gcloud eventarc gke-destinations init
      

      プロンプトが表示されたら、「y」と入力します。

    9. Terraform を使用して Cloud Storage バケットをデプロイし、Eventarc トリガーを作成します。

      export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
      terraform -chdir=vector-database/terraform/cloud-storage init
      terraform -chdir=vector-database/terraform/cloud-storage apply \
        -var project_id=${PROJECT_ID} \
        -var region=${REGION} \
        -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX} \
        -var db_namespace=${DB_NAMESPACE}
      

      プロンプトが表示されたら、「yes」と入力します。コマンドが完了するまで数分かかることがあります。

      Terraform が次のリソースを作成します。

      • ドキュメントをアップロードする Cloud Storage バケット
      • Eventarc トリガー
      • Eventarc の使用権限を持つ service_account_eventarc_name という名前の Google Cloud サービス アカウント。
      • バケットの読み取りと Vertex AI モデルへのアクセス権を持つ service_account_bucket_name という名前の Google Cloud サービス アカウント。

      出力は次のようになります。

      ... # Several lines of output omitted
      
      Apply complete! Resources: 15 added, 0 changed, 0 destroyed.
      
      ... # Several lines of output omitted
      

    ドキュメントを読み込み、chatbot クエリを実行する

    デモ用のドキュメントをアップロードしてクエリを実行し、chatbot を使用してデモ用ドキュメントを検索します。

    1. サンプル ドキュメントを carbon-free-energy.pdf バケットにアップロードします。

      gcloud storage cp vector-database/documents/carbon-free-energy.pdf gs://${PROJECT_ID}-${KUBERNETES_CLUSTER_PREFIX}-training-docs
      
    2. ドキュメント エンベディング ジョブが正常に完了したことを確認します。

      kubectl get job -n ${DB_NAMESPACE}
      

      出力は次のようになります。

      NAME                            COMPLETIONS   DURATION   AGE
      docs-embedder1716570453361446   1/1           32s        71s
      
    3. ロードバランサの外部 IP アドレスを取得します。

      export EXTERNAL_IP=$(kubectl -n ${DB_NAMESPACE} get svc chatbot --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
      echo http://${EXTERNAL_IP}:80
      
    4. この外部 IP アドレスをウェブブラウザで開きます。

      http://EXTERNAL_IP
      

      chatbot から次のようなメッセージが返されます。

      How can I help you?
      
    5. アップロードされたドキュメントの内容について質問します。何も見つけられない場合、chatbot は「I don't know」と回答します。たとえば、次のような質問をします。

      You: Hi, what are Google plans for the future?
      

      chatbot から次のような出力が表示されます。

      Bot: Google intends to run on carbon-free energy everywhere, at all times by 2030. To achieve this, it will rely on a combination of renewable energy sources, such as wind and solar, and carbon-free technologies, such as battery storage.
      
    6. アップロードされたドキュメントのコンテキストに関係のない質問を chatbot にします。たとえば、次のような質問をします。

      You: What are Google plans to colonize Mars?
      

      chatbot から次のような出力が表示されます。

      Bot: I don't know. The provided context does not mention anything about Google's plans to colonize Mars.
      

    アプリケーション コードについて

    このセクションでは、アプリケーション コードの動作について説明します。Docker イメージ内には 3 つのスクリプトがあります。

    • endpoint.py: ドキュメントのアップロードごとに Eventarc イベントを受信し、Kubernetes Job を開始して処理します。
    • embedding-job.py: バケットからドキュメントをダウンロードし、エンベディングを作成して、ベクトル データベースに格納します。
    • chat.py: 保存されているドキュメントのコンテンツにクエリを実行します。

    次の図は、ドキュメント データに基づいて回答を生成するプロセスを示しています。

    この図では、アプリケーションが PDF ファイルを読み込み、ファイルをチャンクに分割してからベクトルに分割し、そのベクトルをベクトル データベースに送信しています。その後、ユーザーが chatbot に質問します。RAG チェーンは、セマンティック検索を使用してベクトル データベースを検索し、質問と一緒にコンテキストも LLM に返します。LLM が質問に回答し、質問をチャットの履歴に保存します。

    endpoint.py について

    このファイルは、Eventarc からのメッセージを処理し、ドキュメントを埋め込む Kubernetes Job を作成します。また、ポート 5001 でどこからでもリクエストを受け付けます。

    Qdrant

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from flask import Flask, jsonify
    from flask import request
    import logging
    import sys,os, time
    from kubernetes import client, config, utils
    import kubernetes.client
    from kubernetes.client.rest import ApiException
    
    
    app = Flask(__name__)
    @app.route('/check')
    def message():
        return jsonify({"Message": "Hi there"})
    
    
    @app.route('/', methods=['POST'])
    def bucket():
        request_data = request.get_json()
        print(request_data)
        bckt = request_data['bucket']
        f_name = request_data['name']
        id = request_data['generation'] 
        kube_create_job(bckt, f_name, id)
        return "ok"
    
    # Set logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Setup K8 configs
    config.load_incluster_config()
    def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="qdrant", container_name="jobcontainer", env_vars={}):
    
        body = client.V1Job(api_version="batch/v1", kind="Job")
        body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
        body.status = client.V1JobStatus()
    
        template = client.V1PodTemplate()
        template.template = client.V1PodTemplateSpec()
        env_list = [
            client.V1EnvVar(name="QDRANT_URL", value=os.getenv("QDRANT_URL")),
            client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
            client.V1EnvVar(name="FILE_NAME", value=f_name), 
            client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
            client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="api-key", name="qdrant-database-apikey"))), 
        ]
    
        container = client.V1Container(name=container_name, image=container_image, env=env_list)
        template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
    
        body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
        return body
    def kube_test_credentials():
        try: 
            api_response = api_instance.get_api_resources()
            logging.info(api_response)
        except ApiException as e:
            print("Exception when calling API: %s\n" % e)
    
    def kube_create_job(bckt, f_name, id):
        container_image = os.getenv("JOB_IMAGE")
        namespace = os.getenv("JOB_NAMESPACE")
        name = "docs-embedder" + id
        body = kube_create_job_object(name, container_image, bckt, f_name)
        v1=client.BatchV1Api()
        try: 
            v1.create_namespaced_job(namespace, body, pretty=True)
        except ApiException as e:
            print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
        return
    
    if __name__ == '__main__':
        app.run('0.0.0.0', port=5001, debug=True)
    

    Elasticsearch

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from flask import Flask, jsonify
    from flask import request
    import logging
    import sys,os, time
    from kubernetes import client, config, utils
    import kubernetes.client
    from kubernetes.client.rest import ApiException
    
    
    app = Flask(__name__)
    @app.route('/check')
    def message():
        return jsonify({"Message": "Hi there"})
    
    
    @app.route('/', methods=['POST'])
    def bucket():
        request_data = request.get_json()
        print(request_data)
        bckt = request_data['bucket']
        f_name = request_data['name']
        id = request_data['generation'] 
        kube_create_job(bckt, f_name, id)
        return "ok"
    
    # Set logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Setup K8 configs
    config.load_incluster_config()
    
    def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="elastic", container_name="jobcontainer", env_vars={}):
    
        body = client.V1Job(api_version="batch/v1", kind="Job")
        body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
        body.status = client.V1JobStatus()
    
        template = client.V1PodTemplate()
        template.template = client.V1PodTemplateSpec()
        env_list = [
            client.V1EnvVar(name="ES_URL", value=os.getenv("ES_URL")),
            client.V1EnvVar(name="INDEX_NAME", value="training-docs"), 
            client.V1EnvVar(name="FILE_NAME", value=f_name), 
            client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
            client.V1EnvVar(name="PASSWORD", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="elastic", name="elasticsearch-ha-es-elastic-user"))), 
        ]
    
        container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
        template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
    
        body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
        return body
    
    def kube_test_credentials():
        try: 
            api_response = api_instance.get_api_resources()
            logging.info(api_response)
        except ApiException as e:
            print("Exception when calling API: %s\n" % e)
    
    def kube_create_job(bckt, f_name, id):
        container_image = os.getenv("JOB_IMAGE")
        namespace = os.getenv("JOB_NAMESPACE")
        name = "docs-embedder" + id
        body = kube_create_job_object(name, container_image, bckt, f_name)
        v1=client.BatchV1Api()
        try: 
            v1.create_namespaced_job(namespace, body, pretty=True)
        except ApiException as e:
            print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
        return
    
    if __name__ == '__main__':
        app.run('0.0.0.0', port=5001, debug=True)
    

    PGVector

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from flask import Flask, jsonify
    from flask import request
    import logging
    import sys,os, time
    from kubernetes import client, config, utils
    import kubernetes.client
    from kubernetes.client.rest import ApiException
    
    
    app = Flask(__name__)
    @app.route('/check')
    def message():
        return jsonify({"Message": "Hi there"})
    
    
    @app.route('/', methods=['POST'])
    def bucket():
        request_data = request.get_json()
        print(request_data)
        bckt = request_data['bucket']
        f_name = request_data['name']
        id = request_data['generation'] 
        kube_create_job(bckt, f_name, id)
        return "ok"
    
    # Set logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Setup K8 configs
    config.load_incluster_config()
    def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="pg-ns", container_name="jobcontainer", env_vars={}):
    
        body = client.V1Job(api_version="batch/v1", kind="Job")
        body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
        body.status = client.V1JobStatus()
    
        template = client.V1PodTemplate()
        template.template = client.V1PodTemplateSpec()
        env_list = [
            client.V1EnvVar(name="POSTGRES_HOST", value=os.getenv("POSTGRES_HOST")),
            client.V1EnvVar(name="DATABASE_NAME", value="app"), 
            client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
            client.V1EnvVar(name="FILE_NAME", value=f_name), 
            client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
            client.V1EnvVar(name="PASSWORD", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="password", name="gke-pg-cluster-app"))), 
            client.V1EnvVar(name="USERNAME", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="username", name="gke-pg-cluster-app"))), 
        ]
    
        container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
        template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
    
        body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
        return body
    def kube_test_credentials():
        try: 
            api_response = api_instance.get_api_resources()
            logging.info(api_response)
        except ApiException as e:
            print("Exception when calling API: %s\n" % e)
    
    def kube_create_job(bckt, f_name, id):
        container_image = os.getenv("JOB_IMAGE")
        namespace = os.getenv("JOB_NAMESPACE")
        name = "docs-embedder" + id
        body = kube_create_job_object(name, container_image, bckt, f_name)
        v1=client.BatchV1Api()
        try: 
            v1.create_namespaced_job(namespace, body, pretty=True)
        except ApiException as e:
            print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
        return
    
    if __name__ == '__main__':
        app.run('0.0.0.0', port=5001, debug=True)
    

    Weaviate

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from flask import Flask, jsonify
    from flask import request
    import logging
    import sys,os, time
    from kubernetes import client, config, utils
    import kubernetes.client
    from kubernetes.client.rest import ApiException
    
    
    app = Flask(__name__)
    @app.route('/check')
    def message():
        return jsonify({"Message": "Hi there"})
    
    
    @app.route('/', methods=['POST'])
    def bucket():
        request_data = request.get_json()
        print(request_data)
        bckt = request_data['bucket']
        f_name = request_data['name']
        id = request_data['generation'] 
        kube_create_job(bckt, f_name, id)
        return "ok"
    
    # Set logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Setup K8 configs
    config.load_incluster_config()
    def kube_create_job_object(name, container_image, bucket_name, f_name, namespace, container_name="jobcontainer", env_vars={}):
    
        body = client.V1Job(api_version="batch/v1", kind="Job")
        body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
        body.status = client.V1JobStatus()
    
        template = client.V1PodTemplate()
        template.template = client.V1PodTemplateSpec()
        env_list = [
            client.V1EnvVar(name="WEAVIATE_ENDPOINT", value=os.getenv("WEAVIATE_ENDPOINT")),
            client.V1EnvVar(name="WEAVIATE_GRPC_ENDPOINT", value=os.getenv("WEAVIATE_GRPC_ENDPOINT")),
            client.V1EnvVar(name="FILE_NAME", value=f_name), 
            client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
            client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="AUTHENTICATION_APIKEY_ALLOWED_KEYS", name="apikeys"))), 
        ]
    
        container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
        template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
    
        body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
        return body
    def kube_test_credentials():
        try: 
            api_response = api_instance.get_api_resources()
            logging.info(api_response)
        except ApiException as e:
            print("Exception when calling API: %s\n" % e)
    
    def kube_create_job(bckt, f_name, id):
        container_image = os.getenv("JOB_IMAGE")
        namespace = os.getenv("JOB_NAMESPACE")
        name = "docs-embedder" + id
        body = kube_create_job_object(name, container_image, bckt, f_name, namespace)
        v1=client.BatchV1Api()
        try: 
            v1.create_namespaced_job(namespace, body, pretty=True)
        except ApiException as e:
            print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
        return
    
    if __name__ == '__main__':
        app.run('0.0.0.0', port=5001, debug=True)
    

    embedding-job.py について

    このファイルはドキュメントを処理し、ベクトル データベースに送信します。

    Qdrant

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import ChatVertexAI
    from langchain.prompts import ChatPromptTemplate
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain.memory import ConversationBufferWindowMemory
    from langchain_community.vectorstores import Qdrant
    from qdrant_client import QdrantClient
    import streamlit as st
    import os
    
    vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
    prompt_template = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
            ("human", """
            The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
            Base your response on the provided text context and the current conversation history to answer the query.
            Select the most relevant information from the context.
            Generate a draft response using the selected information. Remove duplicate content from the draft response.
            Generate your final response after adjusting it to increase accuracy and relevance.
            Now only show your final response!
            If you do not know the answer or context is not relevant, response with "I don't know".
    
            text_context:
            {context}
    
            conversation_history:
            {history}
    
            query:
            {query}
            """),
        ]
    )
    
    embedding_model = VertexAIEmbeddings("text-embedding-005")
    
    client = QdrantClient(
        url=os.getenv("QDRANT_URL"),
        api_key=os.getenv("APIKEY"),
    )
    collection_name = os.getenv("COLLECTION_NAME")
    vector_search = Qdrant(client, collection_name, embeddings=embedding_model)
    def format_docs(docs):
        return "\n\n".join([d.page_content for d in docs])
    
    st.title("🤖 Chatbot")
    if "messages" not in st.session_state:
        st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
    if "memory" not in st.session_state:
        st.session_state["memory"] = ConversationBufferWindowMemory(
            memory_key="history",
            ai_prefix="Bob",
            human_prefix="User",
            k=3,
        )
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.write(message["content"])
    if chat_input := st.chat_input():
        with st.chat_message("human"):
            st.write(chat_input)
            st.session_state.messages.append({"role": "human", "content": chat_input})
    
        found_docs = vector_search.similarity_search(chat_input)
        context = format_docs(found_docs)
    
        prompt_value = prompt_template.format_messages(name="Bob", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
        with st.chat_message("ai"):
            with st.spinner("Typing..."):
                content = ""
                with st.empty():
                    for chunk in vertexAI.stream(prompt_value):
                        content += chunk.content
                        st.write(content)
                st.session_state.messages.append({"role": "ai", "content": content})
    
        st.session_state.memory.save_context({"input": chat_input}, {"output": content})
    
    

    Elasticsearch

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.document_loaders import PyPDFLoader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from elasticsearch import Elasticsearch
    from langchain_community.vectorstores.elasticsearch import ElasticsearchStore
    from google.cloud import storage
    import os
    
    bucketname = os.getenv("BUCKET_NAME")
    filename = os.getenv("FILE_NAME")
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucketname)
    blob = bucket.blob(filename)
    blob.download_to_filename("/documents/" + filename)
    
    loader = PyPDFLoader("/documents/" + filename)
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
    documents = loader.load_and_split(text_splitter)
    
    embeddings = VertexAIEmbeddings("text-embedding-005")
    
    client = Elasticsearch(
        [os.getenv("ES_URL")], 
        verify_certs=False, 
        ssl_show_warn=False,
        basic_auth=("elastic", os.getenv("PASSWORD"))
    )
    
    db = ElasticsearchStore.from_documents(
        documents,
        embeddings,
        es_connection=client,
        index_name=os.getenv("INDEX_NAME")
    )
    db.client.indices.refresh(index=os.getenv("INDEX_NAME"))
    
    print(filename + " was successfully embedded") 
    print(f"# of vectors = {len(documents)}")
    
    

    PGVector

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.document_loaders import PyPDFLoader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain_community.vectorstores.pgvector import PGVector
    from google.cloud import storage
    import os
    bucketname = os.getenv("BUCKET_NAME")
    filename = os.getenv("FILE_NAME")
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucketname)
    blob = bucket.blob(filename)
    blob.download_to_filename("/documents/" + filename)
    
    loader = PyPDFLoader("/documents/" + filename)
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
    documents = loader.load_and_split(text_splitter)
    for document in documents:
        document.page_content = document.page_content.replace('\x00', '')
    
    embeddings = VertexAIEmbeddings("text-embedding-005")
    
    CONNECTION_STRING = PGVector.connection_string_from_db_params(
        driver="psycopg2",
        host=os.environ.get("POSTGRES_HOST"),
        port=5432,
        database=os.environ.get("DATABASE_NAME"),
        user=os.environ.get("USERNAME"),
        password=os.environ.get("PASSWORD"),
    )
    COLLECTION_NAME = os.environ.get("COLLECTION_NAME")
    
    db = PGVector.from_documents(
        embedding=embeddings,
        documents=documents,
        collection_name=COLLECTION_NAME,
        connection_string=CONNECTION_STRING,
        use_jsonb=True
    )
    
    print(filename + " was successfully embedded") 
    print(f"# of vectors = {len(documents)}")
    
    

    Weaviate

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.document_loaders import PyPDFLoader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    import weaviate
    from weaviate.connect import ConnectionParams
    from langchain_weaviate.vectorstores import WeaviateVectorStore
    from google.cloud import storage
    import os
    bucketname = os.getenv("BUCKET_NAME")
    filename = os.getenv("FILE_NAME")
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucketname)
    blob = bucket.blob(filename)
    blob.download_to_filename("/documents/" + filename)
    
    loader = PyPDFLoader("/documents/" + filename)
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
    documents = loader.load_and_split(text_splitter)
    
    embeddings = VertexAIEmbeddings("text-embedding-005")
    
    auth_config = weaviate.auth.AuthApiKey(api_key=os.getenv("APIKEY"))
    client = weaviate.WeaviateClient(
        connection_params=ConnectionParams.from_params(
            http_host=os.getenv("WEAVIATE_ENDPOINT"),
            http_port="80",
            http_secure=False,
            grpc_host=os.getenv("WEAVIATE_GRPC_ENDPOINT"),
            grpc_port="50051",
            grpc_secure=False,
        ),
        auth_client_secret=auth_config
    )
    client.connect()
    if not client.collections.exists("trainingdocs"):
        collection = client.collections.create(name="trainingdocs")
    db = WeaviateVectorStore.from_documents(documents, embeddings, client=client, index_name="trainingdocs")
    
    print(filename + " was successfully embedded") 
    print(f"# of vectors = {len(documents)}")
    
    

    chat.py について

    このファイルは、提供されたコンテキストと以前の回答のみを使用して質問に回答するようにモデルを構成します。コンテキストまたは会話履歴がデータと一致しないと、モデルは I don't know を返します。

    Qdrant

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from flask import Flask, jsonify
    from flask import request
    import logging
    import sys,os, time
    from kubernetes import client, config, utils
    import kubernetes.client
    from kubernetes.client.rest import ApiException
    
    
    app = Flask(__name__)
    @app.route('/check')
    def message():
        return jsonify({"Message": "Hi there"})
    
    
    @app.route('/', methods=['POST'])
    def bucket():
        request_data = request.get_json()
        print(request_data)
        bckt = request_data['bucket']
        f_name = request_data['name']
        id = request_data['generation'] 
        kube_create_job(bckt, f_name, id)
        return "ok"
    
    # Set logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Setup K8 configs
    config.load_incluster_config()
    def kube_create_job_object(name, container_image, bucket_name, f_name, namespace="qdrant", container_name="jobcontainer", env_vars={}):
    
        body = client.V1Job(api_version="batch/v1", kind="Job")
        body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
        body.status = client.V1JobStatus()
    
        template = client.V1PodTemplate()
        template.template = client.V1PodTemplateSpec()
        env_list = [
            client.V1EnvVar(name="QDRANT_URL", value=os.getenv("QDRANT_URL")),
            client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"), 
            client.V1EnvVar(name="FILE_NAME", value=f_name), 
            client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
            client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="api-key", name="qdrant-database-apikey"))), 
        ]
    
        container = client.V1Container(name=container_name, image=container_image, env=env_list)
        template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
    
        body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
        return body
    def kube_test_credentials():
        try: 
            api_response = api_instance.get_api_resources()
            logging.info(api_response)
        except ApiException as e:
            print("Exception when calling API: %s\n" % e)
    
    def kube_create_job(bckt, f_name, id):
        container_image = os.getenv("JOB_IMAGE")
        namespace = os.getenv("JOB_NAMESPACE")
        name = "docs-embedder" + id
        body = kube_create_job_object(name, container_image, bckt, f_name)
        v1=client.BatchV1Api()
        try: 
            v1.create_namespaced_job(namespace, body, pretty=True)
        except ApiException as e:
            print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
        return
    
    if __name__ == '__main__':
        app.run('0.0.0.0', port=5001, debug=True)
    

    Elasticsearch

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import ChatVertexAI
    from langchain.prompts import ChatPromptTemplate
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain.memory import ConversationBufferWindowMemory
    from elasticsearch import Elasticsearch
    from langchain_community.vectorstores.elasticsearch import ElasticsearchStore
    import streamlit as st
    import os
    
    vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
    prompt_template = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
            ("human", """
            The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
            Base your response on the provided text context and the current conversation history to answer the query.
            Select the most relevant information from the context.
            Generate a draft response using the selected information. Remove duplicate content from the draft response.
            Generate your final response after adjusting it to increase accuracy and relevance.
            Now only show your final response!
            If you do not know the answer or context is not relevant, response with "I don't know".
    
            text_context:
            {context}
    
            conversation_history:
            {history}
    
            query:
            {query}
            """),
        ]
    )
    
    embedding_model = VertexAIEmbeddings("text-embedding-005")
    
    client = Elasticsearch(
        [os.getenv("ES_URL")], 
        verify_certs=False, 
        ssl_show_warn=False,
        basic_auth=("elastic", os.getenv("PASSWORD"))
    )
    vector_search = ElasticsearchStore(
        index_name=os.getenv("INDEX_NAME"),
        es_connection=client,
        embedding=embedding_model
    )
    
    def format_docs(docs):
        return "\n\n".join([d.page_content for d in docs])
    
    st.title("🤖 Chatbot")
    if "messages" not in st.session_state:
        st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
    
    if "memory" not in st.session_state:
        st.session_state["memory"] = ConversationBufferWindowMemory(
            memory_key="history",
            ai_prefix="Bot",
            human_prefix="User",
            k=3,
        )
    
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.write(message["content"])
    
    if chat_input := st.chat_input():
        with st.chat_message("human"):
            st.write(chat_input)
            st.session_state.messages.append({"role": "human", "content": chat_input})
    
        found_docs = vector_search.similarity_search(chat_input)
        context = format_docs(found_docs)
    
        prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
        with st.chat_message("ai"):
            with st.spinner("Typing..."):
                content = ""
                with st.empty():
                    for chunk in vertexAI.stream(prompt_value):
                        content += chunk.content
                        st.write(content)
                st.session_state.messages.append({"role": "ai", "content": content})
    
        st.session_state.memory.save_context({"input": chat_input}, {"output": content})
    
    

    PGVector

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import ChatVertexAI
    from langchain.prompts import ChatPromptTemplate
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain.memory import ConversationBufferWindowMemory
    from langchain_community.vectorstores.pgvector import PGVector
    import streamlit as st
    import os
    
    vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
    prompt_template = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
            ("human", """
            The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
            Base your response on the provided text context and the current conversation history to answer the query.
            Select the most relevant information from the context.
            Generate a draft response using the selected information. Remove duplicate content from the draft response.
            Generate your final response after adjusting it to increase accuracy and relevance.
            Now only show your final response!
            If you do not know the answer or context is not relevant, response with "I don't know".
    
            text_context:
            {context}
    
            conversation_history:
            {history}
    
            query:
            {query}
            """),
        ]
    )
    
    embedding_model = VertexAIEmbeddings("text-embedding-005")
    
    CONNECTION_STRING = PGVector.connection_string_from_db_params(
        driver="psycopg2",
        host=os.environ.get("POSTGRES_HOST"),
        port=5432,
        database=os.environ.get("DATABASE_NAME"),
        user=os.environ.get("USERNAME"),
        password=os.environ.get("PASSWORD"),
    )
    COLLECTION_NAME = os.environ.get("COLLECTION_NAME"),
    
    vector_search = PGVector(
        collection_name=COLLECTION_NAME,
        connection_string=CONNECTION_STRING,
        embedding_function=embedding_model,
    )
    
    def format_docs(docs):
        return "\n\n".join([d.page_content for d in docs])
    
    st.title("🤖 Chatbot")
    if "messages" not in st.session_state:
        st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
    
    if "memory" not in st.session_state:
        st.session_state["memory"] = ConversationBufferWindowMemory(
            memory_key="history",
            ai_prefix="Bot",
            human_prefix="User",
            k=3,
        )
    
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.write(message["content"])
    
    if chat_input := st.chat_input():
        with st.chat_message("human"):
            st.write(chat_input)
            st.session_state.messages.append({"role": "human", "content": chat_input})
    
        found_docs = vector_search.similarity_search(chat_input)
        context = format_docs(found_docs)
    
        prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
        with st.chat_message("ai"):
            with st.spinner("Typing..."):
                content = ""
                with st.empty():
                    for chunk in vertexAI.stream(prompt_value):
                        content += chunk.content
                        st.write(content)
                st.session_state.messages.append({"role": "ai", "content": content})
    
        st.session_state.memory.save_context({"input": chat_input}, {"output": content})
    
    

    Weaviate

    # Copyright 2024 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    from langchain_google_vertexai import ChatVertexAI
    from langchain.prompts import ChatPromptTemplate
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain.memory import ConversationBufferWindowMemory
    import weaviate
    from weaviate.connect import ConnectionParams
    from langchain_weaviate.vectorstores import WeaviateVectorStore
    import streamlit as st
    import os
    
    vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
    prompt_template = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
            ("human", """
            The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
            Base your response on the provided text context and the current conversation history to answer the query.
            Select the most relevant information from the context.
            Generate a draft response using the selected information. Remove duplicate content from the draft response.
            Generate your final response after adjusting it to increase accuracy and relevance.
            Now only show your final response!
            If you do not know the answer or context is not relevant, response with "I don't know".
    
            text_context:
            {context}
    
            conversation_history:
            {history}
    
            query:
            {query}
            """),
        ]
    )
    
    embedding_model = VertexAIEmbeddings("text-embedding-005")
    
    auth_config = weaviate.auth.AuthApiKey(api_key=os.getenv("APIKEY"))
    client = weaviate.WeaviateClient(
        connection_params=ConnectionParams.from_params(
            http_host=os.getenv("WEAVIATE_ENDPOINT"),
            http_port="80",
            http_secure=False,
            grpc_host=os.getenv("WEAVIATE_GRPC_ENDPOINT"),
            grpc_port="50051",
            grpc_secure=False,
        ),
        auth_client_secret=auth_config
    )
    client.connect()
    
    vector_search = WeaviateVectorStore.from_documents([],embedding_model,client=client, index_name="trainingdocs")
    
    def format_docs(docs):
        return "\n\n".join([d.page_content for d in docs])
    
    st.title("🤖 Chatbot")
    if "messages" not in st.session_state:
        st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
    
    if "memory" not in st.session_state:
        st.session_state["memory"] = ConversationBufferWindowMemory(
            memory_key="history",
            ai_prefix="Bot",
            human_prefix="User",
            k=3,
        )
    
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.write(message["content"])
    
    if chat_input := st.chat_input():
        with st.chat_message("human"):
            st.write(chat_input)
            st.session_state.messages.append({"role": "human", "content": chat_input})
    
        found_docs = vector_search.similarity_search(chat_input)
        context = format_docs(found_docs)
    
        prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
        with st.chat_message("ai"):
            with st.spinner("Typing..."):
                content = ""
                with st.empty():
                    for chunk in vertexAI.stream(prompt_value):
                        content += chunk.content
                        st.write(content)
                st.session_state.messages.append({"role": "ai", "content": content})
    
        st.session_state.memory.save_context({"input": chat_input}, {"output": content})
    
    

    クリーンアップ

    このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

    プロジェクトを削除する

    課金が発生しないようにする最も簡単な方法は、このチュートリアル用に作成したプロジェクトを削除することです。

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

    プロジェクトを削除すると、クリーンアップが完了します。プロジェクトを削除していない場合は、個々のリソースを削除します。

    リソースを個別に削除する

    1. Artifact Registry リポジトリを削除します。

      gcloud artifacts repositories delete ${KUBERNETES_CLUSTER_PREFIX}-images \
          --location=${REGION} \
          --async
      

      プロンプトが表示されたら、「y」と入力します。

    2. Cloud Storage バケットと Eventarc トリガーを削除します。

      export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
      terraform -chdir=vector-database/terraform/cloud-storage destroy \
        -var project_id=${PROJECT_ID} \
        -var region=${REGION} \
        -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX} \
        -var db_namespace=${DB_NAMESPACE}
      

      プロンプトが表示されたら、「yes」と入力します。

      Eventarc では、作成時と削除時の両方で有効なエンドポイント ターゲットが必要です。

    次のステップ