创建自定义连接器

本页面介绍了如何创建自定义连接器。

准备工作

在开始之前,请确保您已执行以下操作:

  • 检查您的 Google Cloud 项目是否已启用结算功能

  • 安装初始化 Google Cloud CLI。请确保已针对您的项目进行身份验证。

  • 获取您的 Google Cloud 项目的 Discovery Engine 管理员访问权限。

  • 获取第三方数据源的访问凭证(例如 API 密钥或数据库身份验证)。

  • 制定明确的数据映射计划。这必须包括要编入索引的字段以及如何表示访问权限控制(包括第三方身份)。

创建基本连接器

本部分演示了如何使用所选语言创建自定义连接器。此处所述的原则和模式适用于任何外部系统。只需使用所选语言调整特定来源的 API 调用和数据转换,即可创建基本连接器。

提取数据

首先,从第三方数据源检索数据。在此示例中,我们将演示如何使用分页功能来提取帖子。对于生产环境,我们建议使用流式处理方法来处理大型数据集。这样可以防止一次性加载所有数据时可能出现的内存问题。

Python

    def fetch_posts(base_url: str, per_page: int = 15) -> List[dict]:
        #Fetch all posts from the given site.#
        url = base_url.rstrip("/") + "/wp-json/wp/v2/posts"
        posts: List[dict] = []
        page = 1
        while True:
            resp = requests.get(
                url,
                params={"page": page, "per_page": per_page},
            )
            resp.raise_for_status()
            batch = resp.json()
            posts.extend(batch)
            if len(batch) < per_page:
                break
            page += 1
        return posts

转换数据

如需将源数据转换为 Discovery Engine 文档格式,请按以下示例载荷所示设计文档结构。您可以根据需要添加任意数量的键值对。例如,您可以添加完整内容以进行全面搜索。或者,您也可以添加结构化字段以进行分面搜索,或同时添加两者的组合。

Python

    def convert_posts_to_documents(posts: List[dict]) -> List[discoveryengine.Document]:
        # Convert WP posts into Discovery Engine Document messages.
        docs: List[discoveryengine.Document] = []
        for post in posts:
            payload = {
                "title": post.get("title", {}).get("rendered"),
                "body": post.get("content", {}).get("rendered"),
                "url": post.get("link"),
                "author": post.get("author"),
                "categories": post.get("categories"),
                "tags": post.get("tags"),
                "date": post.get("date"),
            }
            doc = discoveryengine.Document(
                id=str(post["id"]),
                json_data=json.dumps(payload),
            )
            docs.append(doc)
        return docs

检索或创建身份存储区

如需管理用户身份和群组以进行访问权限控制,您必须检索或创建身份存储区。此函数通过 ID、项目和位置获取现有身份存储区。如果身份存储区不存在,则会创建并返回一个新的空身份存储区。

Python

    def get_or_create_ims_data_store(
        project_id: str,
        location: str,
        identity_mapping_store_id: str,
    ) -> discoveryengine.DataStore:
      """Get or create a DataStore."""
      # Initialize the client
      client_ims = discoveryengine.IdentityMappingStoreServiceClient()
      # Construct the parent resource name
      parent_ims = client_ims.location_path(project=project_id, location=location)

      try:
        # Create the request object
        name = f"projects/{project_id}/locations/{location}/identityMappingStores/{identity_mapping_store_id}"
        request = discoveryengine.GetIdentityMappingStoreRequest(
            name=name,
        )
        return client_ims.get_identity_mapping_store(request=request)
      except:
        # Create the IdentityMappingStore object (it can be empty for basic creation)
        identity_mapping_store = discoveryengine.IdentityMappingStore()
        # Create the request object
        request = discoveryengine.CreateIdentityMappingStoreRequest(
            parent=parent_ims,
            identity_mapping_store=identity_mapping_store,
            identity_mapping_store_id=identity_mapping_store_id,
        )
        return client_ims.create_identity_mapping_store(request=request)

get_or_create_ims_data_store 函数使用以下键变量:

  • project_id:您的 Google Cloud 项目的 ID。
  • location:身份映射存储区的 Google Cloud 位置。
  • identity_mapping_store_id:身份存储区的唯一标识符。
  • client_ims:discoveryengine.IdentityMappingStoreServiceClient 的实例,用于与身份存储区 API 进行交互。
  • parent_ims:父级位置的资源名称,使用 client_ims.location_path 构建。
  • name:身份映射存储区的完整资源名称,用于 GetIdentityMappingStoreRequest。

将身份映射注入到身份存储区

如需将身份映射条目加载到指定的身份存储区,请使用此函数。该函数接受身份映射条目列表,并启动内嵌导入操作。这对于建立访问权限控制和个性化所需的“用户-群组-外部身份”关系至关重要。

Python

def load_ims_data(
    ims_store: discoveryengine.DataStore,
    id_mapping_data: list[discoveryengine.IdentityMappingEntry],
) -> discoveryengine.DataStore:
  """Get the IMS data store."""
  # Initialize the client
  client_ims = discoveryengine.IdentityMappingStoreServiceClient()

  #  Create the InlineSource object
  inline_source = discoveryengine.ImportIdentityMappingsRequest.InlineSource(
      identity_mapping_entries=id_mapping_data
  )

  # Create the main request object
  request_ims = discoveryengine.ImportIdentityMappingsRequest(
      identity_mapping_store=ims_store.name,
      inline_source=inline_source,
  )

  try:
    # Create the InlineSource object, which holds your list of entries
    operation = client_ims.import_identity_mappings(
        request=request_ims,
    )
    result = operation.result()
    return result

  except Exception as e:
    print(f"IMS Load Error: {e}")
    result = operation.result()
    return result

load_ims_data 函数使用以下键变量:

  • ims_store:discoveryengine.DataStore 对象,表示加载数据的身份映射存储区。
  • id_mapping_data:discoveryengine.IdentityMappingEntry 对象的列表,每个对象都包含一个外部身份及其对应的用户或群组 ID。
  • result:类型为 discoveryengine.DataStore 的返回值。

创建数据存储区

如需使用自定义连接器,您必须为内容初始化数据存储区。请将 default_collection 用于自定义连接器。IndustryVertical 参数用于针对特定应用场景自定义数据存储区的行为。GENERIC 适用于大多数情况。不过,您可以为特定行业选择其他值,例如 MEDIAHEALTHCARE_FHIR。根据项目的命名惯例和要求配置显示名称和其他属性。

Python

def get_or_create_data_store(
    project_id: str,
    location: str,
    display_name: str,
    data_store_id: str,
    identity_mapping_store: str,
) -> discoveryengine.DataStore:
  """Get or create a DataStore."""
  client = discoveryengine.DataStoreServiceClient()
  ds_name = client.data_store_path(project_id, location, data_store_id)
  try:
    result = client.get_data_store(request={"name": ds_name})
    return result
  except:
    parent = client.collection_path(project_id, location, "default_collection")
    operation = client.create_data_store(
        request={
            "parent": parent,
            "data_store": discoveryengine.DataStore(
                display_name=display_name,
                acl_enabled=True,
                industry_vertical=discoveryengine.IndustryVertical.GENERIC,
                identity_mapping_store=identity_mapping_store,
            ),
            "data_store_id": data_store_id,
        }
    )
    result = operation.result()
    return result

get_or_create_data_store 函数使用以下键变量:

  • project_id:您的 Google Cloud 项目的 ID。
  • location:数据存储区的 Google Cloud位置。
  • display_name:数据存储区的人类可读显示名称。
  • data_store_id:数据存储区的唯一标识符。
  • identity_mapping_store:要绑定的身份映射存储区的资源名称。
  • result:类型为 discoveryengine.DataStore 的返回值。

内嵌上传文档

如需将文档直接发送到 Discovery Engine,请使用内嵌上传功能。此方法默认使用增量协调模式,不支持完全协调模式。在增量模式下,系统会添加新文档并更新现有文档,但不会删除来源中不再存在的文档。完全协调模式会将数据存储区与您的源数据同步,包括删除来源中不再存在的文档。

增量协调非常适合处理频繁、少量数据更改的系统,例如 CRM。无需同步整个数据库,只需发送特定更改,从而加快流程并提高效率。您仍可以定期执行完全同步,以保持整体数据完整性。

Python

    def upload_documents_inline(
        project_id: str,
        location: str,
        data_store_id: str,
        branch_id: str,
        documents: List[discoveryengine.Document],
    ) -> discoveryengine.ImportDocumentsMetadata:
        """Inline import of Document messages."""
        client = discoveryengine.DocumentServiceClient()
        parent = client.branch_path(
            project=project_id,
            location=location,
            data_store=data_store_id,
            branch=branch_id,
        )
        request = discoveryengine.ImportDocumentsRequest(
            parent=parent,
            inline_source=discoveryengine.ImportDocumentsRequest.InlineSource(
                documents=documents,
            ),
        )
        operation = client.import_documents(request=request)
        operation.result()
        result = operation.metadata
        return result

upload_documents_inline 函数使用以下键变量:

  • project_id:您的 Google Cloud 项目的 ID。
  • location:数据存储区的 Google Cloud 位置。
  • data_store_id:数据存储区的 ID。
  • branch_id:数据存储区中分支的 ID(通常为“0”)。
  • documents:要上传的 discoveryengine.Document 对象的列表。
  • result:类型为 discoveryengine.ImportDocumentsMetadata 的返回值。

验证连接器

如需验证连接器是否按预期运行,请执行测试运行,以确保从来源到 Discovery Engine 的数据传输正确。

Python

    SITE = "https://altostrat.com"
    PROJECT_ID = "ucs-3p-connectors-testing"
    LOCATION = "global"
    IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
    DATA_STORE_ID = "my-acl-ds-id1"
    BRANCH_ID = "0"

    posts = fetch_posts(SITE)
    docs = convert_posts_to_documents(posts)
    print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")

    try:
      # Step #1: Retrieve an existing identity mapping store or create a new identity mapping store
      ims_store = get_or_create_ims_data_store(PROJECT_ID, LOCATION, IDENTITY_MAPPING_STORE_ID)
      print(f"STEP #1: IMS Store Retrieval/Creation: {ims_store}")

      RAW_IDENTITY_MAPPING_DATA = [
          discoveryengine.IdentityMappingEntry(
              external_identity="external_id_1",
              user_id="testuser1@example.com",
          ),
          discoveryengine.IdentityMappingEntry(
              external_identity="external_id_2",
              user_id="testuser2@example.com",
          ),
          discoveryengine.IdentityMappingEntry(
              external_identity="external_id_2",
              group_id="testgroup1@example.com",
          )
      ]

      # Step #2: Load IMS Data
      response = load_ims_data(ims_store, RAW_IDENTITY_MAPPING_DATA)
      print(
          "\nStep #2: Load Data in IMS Store successful.", response
      )

      # Step #3: Create Entity Data Store & Bind IMS Data Store
      data_store =  get_or_create_data_store(PROJECT_ID, LOCATION, "my-acl-datastore", DATA_STORE_ID, ims_store.name)
      print("\nStep #3: Entity Data Store Create Result: ", data_store)

      metadata = upload_documents_inline(
          PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, docs
      )
      print(f"Uploaded {metadata.success_count} documents inline.")

    except gcp_exceptions.GoogleAPICallError as e:
      print(f"\n--- API Call Failed ---")
      print(f"Server Error Message: {e.message}")
      print(f"Status Code: {e.code}")

    except Exception as e:
      print(f"An error occurred: {e}")

验证连接器代码是否使用以下键变量:

  • SITE:第三方数据源的基础网址。
  • PROJECT_ID:您的 Google Cloud 项目 ID。
  • LOCATION:资源所在的 Google Cloud 位置。
  • IDENTITY_MAPPING_STORE_ID:身份映射存储区的唯一 ID。
  • DATA_STORE_ID:数据存储区的唯一 ID。
  • BRANCH_ID:数据存储区中分支的 ID。
  • posts:存储从第三方来源提取的帖子。
  • docs:以 discoveryengine.Document 格式存储转换后的文档。
  • ims_store:已检索或创建的 discoveryengine.DataStore 对象,用于身份映射。
  • RAW_IDENTITY_MAPPING_DATA:discoveryengine.IdentityMappingEntry 对象的列表。

预期输出:

Shell

  Fetched 20 posts and converted to 20 documents.
  STEP #1: IMS Store Retrieval/Creation: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17"
  Step #2: Load Data in IMS Store successful.
  Step #3: Entity Data Store Create Result: "projects/ <Project Number>/locations/global/collections/default_collection/dataStores/my-acl-ds-id1"
  display_name: "my-acl-datastore"
  industry_vertical: GENERIC
  create_time {
    seconds: 1760906997
    nanos: 192641000
  }
  default_schema_id: "default_schema"
  acl_enabled: true
  identity_mapping_store: "projects/ <Project Number>/locations/global/identityMappingStores/your-unique-ims-id17".
  Uploaded 20 documents inline.

此时,您还可以在 Google Google Cloud 控制台中看到数据存储区:

自定义连接器数据存储区
自定义连接器数据存储区。

创建使用 Google Cloud Storage 上传功能的连接器

虽然内嵌导入功能非常适合开发,但生产连接器应使用 Google Cloud Storage,以提高可伸缩性并启用完全协调模式。此方法可高效处理大型数据集,并支持自动删除第三方数据源中不再存在的文档。

将文档转换为 JSONL

如需准备文档以批量导入 Discovery Engine,请将其转换为 JSON 行格式。

Python

    def convert_documents_to_jsonl(
        documents: List[discoveryengine.Document],
    ) -> str:
        """Serialize Document messages to JSONL."""
        return "\n".join(
            discoveryengine.Document.to_json(doc, indent=None)
            for doc in documents
        ) + "\n"

convert_documents_to_jsonl 函数使用以下变量:

  • documents:要转换的 discoveryengine.Document 对象的列表。

上传到 Google Cloud Storage

为了实现高效的批量导入,请将数据暂存在 Google Cloud Storage 中。

Python

    def upload_jsonl_to_gcs(jsonl: str, bucket_name: str, blob_name: str) -> str:
        """Upload JSONL content to Google Cloud Storage."""
        client = storage.Client()
        bucket = client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        blob.upload_from_string(jsonl, content_type="application/json")
        return f"gs://{bucket_name}/{blob_name}"

upload_jsonl_to_gcs 函数使用以下键变量:

  • jsonl:要上传的 JSONL 格式的字符串内容。
  • bucket_name: Google Cloud Storage 存储桶的名称。
  • blob_name:指定存储桶中的 blob(对象)的名称。

从 Google Cloud Storage 导入并进行完全协调

如需使用完全协调模式执行完整的数据同步,请使用此方法。这样可确保数据存储区完全镜像第三方数据源,并自动移除不再存在的所有文档。

Python

    def import_documents_from_gcs(
        project_id: str,
        location: str,
        data_store_id: str,
        branch_id: str,
        gcs_uri: str,
    ) -> discoveryengine.ImportDocumentsMetadata:
        """Bulk-import documents from Google Cloud Storage with FULL reconciliation mode."""
        client = discoveryengine.DocumentServiceClient()
        parent = client.branch_path(
            project=project_id,
            location=location,
            data_store=data_store_id,
            branch=branch_id,
        )
        gcs_source = discoveryengine.GcsSource(input_uris=[gcs_uri])
        request = discoveryengine.ImportDocumentsRequest(
            parent=parent,
            gcs_source=gcs_source,
            reconciliation_mode=
                discoveryengine.ImportDocumentsRequest
                .ReconciliationMode.FULL,
        )
        operation = client.import_documents(request=request)
        operation.result()
        return operation.metadata

import_documents_from_gcs 函数使用以下键变量:

  • project_id:您的 Google Cloud 项目的 ID。
  • location:数据存储区的 Google Cloud 位置。
  • data_store_id:数据存储区的 ID。
  • branch_id:数据存储区中分支的 ID(通常为“0”)。
  • gcs_uri:指向 JSONL 文件的 Google Cloud Storage URI。

测试 Google Cloud Storage 上传

如需验证基于 Google Cloud Storage 的导入工作流,请执行以下操作:

Python

  BUCKET = "your-existing-bucket"
  BLOB = "path-to-any-blob/wp/posts.jsonl"
  SITE = "https://altostrat.com"
  PROJECT_ID = "ucs-3p-connectors-testing"
  LOCATION = "global"
  IDENTITY_MAPPING_STORE_ID = "your-unique-ims-id17" # A unique ID for your new store
  DATA_STORE_ID = "your-data-store-id"
  BRANCH_ID = "0"
  jsonl_payload = convert_documents_to_jsonl(docs)
  gcs_uri = upload_jsonl_to_gcs(jsonl_payload, BUCKET, BLOB)
  posts = fetch_posts(SITE)
  docs = convert_posts_to_documents(posts)
  print(f"Fetched {len(posts)} posts and converted to {len(docs)} documents.")
  print("Uploaded to:", gcs_uri)

  metadata = import_documents_from_gcs(
      PROJECT_ID, LOCATION, DATA_STORE_ID, BRANCH_ID, gcs_uri
  )
  print(f"Imported: {metadata.success_count} documents")

以下键变量用于测试 Google Cloud Storage 上传:

  • BUCKET: Google Cloud Storage 存储桶的名称。
  • BLOB:存储桶中 blob 的路径。
  • SITE:第三方数据源的基础网址。
  • PROJECT_ID:您的 Google Cloud 项目 ID。
  • LOCATION:资源所在的 Google Cloud 位置(例如,“全球”)。
  • IDENTITY_MAPPING_STORE_ID:身份映射存储区的唯一 ID。
  • DATA_STORE_ID:数据存储区的唯一 ID。
  • BRANCH_ID:数据存储区中分支的 ID(通常为“0”)。
  • jsonl_payload:转换为 JSONL 格式的文档。
  • gcs_uri:上传的 JSONL 文件的 Google Cloud Storage URI。

预期输出:

Shell

    Fetched 20 posts and converted to 20 documents.
    Uploaded to: gs://alex-de-bucket/wp/posts.jsonl
    Imported: 20 documents

管理权限

为了在企业环境中管理文档级访问权限,Gemini Enterprise 支持访问控制列表 (ACL) 和身份映射,有助于限制用户可以查看的内容。

在数据存储区中启用 ACL

如需在创建数据存储区时启用 ACL,请执行以下操作:

Python

  # get_or_create_data_store()
  "data_store": discoveryengine.DataStore(
      display_name=data_store_id,
      industry_vertical=discoveryengine.IndustryVertical.GENERIC,
      acl_enabled=True, # ADDED
  )

向文档添加 ACL

如需在转换文档时计算并包含 AclInfo,请执行以下操作:

Python

  # convert_posts_to_documents()
  doc = discoveryengine.Document(
      id=str(post["id"]),
      json_data=json.dumps(payload),
      acl_info=discoveryengine.Document.AclInfo(
          readers=[{
              "principals": [
                  {"user_id": "baklavainthebalkans@gmail.com"},
                  {"user_id": "cloudysanfrancisco@gmail.com"}
              ]
          }]
      ),
  )

将内容设为公开

如需将文档设为可公开访问,请按如下所述设置 readers 字段:

Python

  readers=[{"idp_wide": True}]

验证 ACL

如需验证 ACL 配置是否按预期运行,请考虑以下事项:

  • 以无权访问相应文档的用户的身份进行搜索。

  • 检查 Cloud Storage 中上传的文档结构,并将其与引用进行比较。

JSON

  {
    "id": "108",
    "jsonData": "{...}",
    "aclInfo": {
      "readers": [
        {
          "principals": [
            { "userId": "baklavainthebalkans@gmail.com" },
            { "userId": "cloudysanfrancisco@gmail.com" }
          ],
          "idpWide": false
        }
      ]
    }
  }

使用身份映射

在以下情况下使用身份映射:

  • 您的第三方数据源使用非 Google 身份

  • 您希望引用自定义群组(例如 wp-admins),而不是单个用户

  • API 仅返回群组名称

  • 您需要手动对用户进行分组,以实现规模或一致性

如需执行身份映射,请按以下步骤操作:

  1. 创建并关联身份数据存储区。
  2. 导入外部身份(例如,external_group:wp-admins)。 导入时,请勿添加 external_group: 前缀,例如:

    JSON

      {
        "externalIdentity": "wp-admins",
        "userId": "user@example.com"
      }
    
  3. 文档ACL 信息中,定义 principal identifier 中的外部实体 ID。引用自定义群组时,请在 groupId 字段中使用 external_group: 前缀。

  4. 在导入期间,文档的 ACL 信息中的群组 ID 需要使用 external_group: 前缀,但在将身份导入映射存储区时,系统不会使用该前缀。 包含身份映射的示例文档:

    JSON

      {
        "id": "108",
        "aclInfo": {
          "readers": [
            {
              "principals": [
                {
                  "userId": "cloudysanfrancisco@gmail.com"
                },
                {
                  "groupId": "external_group:wp-admins"
                }
              ]
            }
          ]
        },
        "structData": {
          "id": 108,
          "date": "2025-04-24T18:16:04",
          ...
        }
      }
    
Gemini Enterprise 客户端库