Apache Kafka

通过 Apache Kafka 连接器,您可以对 Apache Kafka 数据库执行插入、删除、更新和读取操作。

支持的版本

Apache Kafka 连接器利用原生客户端库与给定 Kafka 集群建立连接,且该连接器适用于客户端版本 3.3.1。但是,连接器可以与 Kafka 集群 3.0 到 3.3.1 之间的连接。

准备工作

在使用 Apache Kafka 连接器之前,请先完成以下任务:

  • 在您的 Google Cloud 项目中:
    • roles/connectors.admin IAM 角色授予配置连接器的用户。
    • 将以下 IAM 角色授予您要用其来使用连接器的服务账号:
      • roles/secretmanager.viewer
      • roles/secretmanager.secretAccessor

      服务账号是一种特殊类型的 Google 账号,用于表示需要验证身份并获得授权以访问 Google API 数据的非人类用户。如果您没有服务账号,则必须创建一个服务账号。如需了解详情,请参阅创建服务账号

    • 启用以下服务:
      • secretmanager.googleapis.com (Secret Manager API)
      • connectors.googleapis.com (Connectors API)

      如需了解如何启用服务,请参阅启用服务

    如果之前没有为您的项目启用这些服务或权限,则在您配置连接器时,系统会提示您启用。

配置连接器

配置连接器时,您需要创建与数据源(即后端系统)的连接。一个连接需专用于一个数据源。这意味着,如果您有许多数据源,则必须为每个数据源创建单独的连接。如需创建连接,请执行以下步骤:

  1. Cloud 控制台 中,进入 Integration Connectors > 连接页面,然后选择或创建一个 Google Cloud 项目。

    转到“连接”页面

  2. 点击 + 新建以打开创建连接页面。
  3. 位置部分中,选择连接的位置。
    1. 区域:从下拉列表中选择一个位置。

      如需查看所有受支持区域的列表,请参阅位置

    2. 点击下一步
  4. 连接详情部分中,完成以下操作:
    1. 连接器:从可用连接器的下拉列表中选择 Apache Kafka
    2. 连接器版本:从可用版本的下拉列表中选择一个连接器版本。
    3. 连接名称字段中,输入连接实例的名称。

      连接名称必须符合以下条件:

      • 连接名称可以使用字母、数字或连字符。
      • 字母必须小写。
      • 连接名称必须以字母开头,以字母或数字结尾。
      • 连接名称不能超过 63 个字符。
    4. (可选)输入连接实例的说明
    5. 服务账号:选择具有所需角色的服务账号。
    6. (可选)配置连接节点设置

      • 节点数下限:输入连接节点数下限。
      • 节点数上限:输入连接节点数上限。

      节点是处理事务的连接单元(或副本)。 连接处理越多事务就需要越多节点,相反,处理越少事务需要越少节点。 如需了解节点如何影响连接器价格,请参阅连接节点的价格。如果未输入任何值,则默认情况下,节点数下限设置为 2(以便提高可用性),节点数上限设置为 50。

    7. 注册表版本:从指定主题的 RegistryUrl 中读取的架构版本。
    8. 注册表用户:要在 RegistryUrl 中指定的服务器进行授权的用户名/访问权限键值对。
    9. 注册表密码:Secret Manager Secret,其中包含要在 RegistryUrl 中指定的服务器授权的密码/Secret 键值对。
    10. 注册表类型:为特定主题指定的架构类型。
    11. 注册表服务:用于处理主题架构的 Schema Registry 服务。
    12. UseSSL:此字段用于设置是否已启用 SSL。
    13. (可选)点击 + 添加标签,以键值对的形式向连接添加标签。
    14. 点击下一步
  5. 目标部分中,输入要连接到的远程主机(后端系统)的详细信息。
    1. Destination Type:选择 Destination Type
      1. 主机地址字段中,指定目标的主机名或 IP 地址。
        1. 如果要与后端系统建立专用连接,请按以下步骤操作:
          1. 创建 PSC 服务连接
          2. 创建端点连接然后在主机地址字段中输入端点连接的详细信息。
        2. 如果要与后端系统建立公共连接以提高安全性,您可以考虑为连接配置静态出站 IP 地址,然后将防火墙规则配置为仅将特定静态 IP 地址列入许可名单。

      要输入其他目标,请点击 + 添加目标

    2. 点击下一步
  6. 身份验证部分中,输入身份验证详细信息。
    1. 选择身份验证类型,然后输入相关详细信息。

      Apache Kafka 连接支持以下身份验证类型:

      • 用户名和密码
      • 匿名
    2. 如需了解如何配置这些身份验证类型,请参阅配置身份验证

    3. 点击下一步
  7. 查看:查看您的连接和身份验证详细信息。
  8. 点击创建

配置身份验证

根据您要使用的身份验证输入详细信息。

  • 用户名和密码
    • 用户名:用于连接的 Apache Kafka 用户名。
    • 密码:包含与 Apache Kafka 用户名关联的 Secret Manager Secret。
  • 匿名

    如果您要使用匿名登录,请选择不可用

实体、操作和动作

所有集成连接器都会为所连接应用的对象提供抽象层。您只能通过此抽象访问应用的对象。抽象作为实体、操作和动作向您展示。

  • 实体:实体可以被视为连接的应用或服务中的对象或属性集合。不同连接器的实体定义也会有所不同。例如,在数据库连接器中,表是实体;在文件服务器连接器中,文件夹是实体;在消息传递系统连接器中,队列是实体。

    不过,可能连接器不支持或不支持任何实体,在这种情况下,Entities 列表将为空。

  • 操作:操作是指您可以对实体执行的操作。您可以对实体执行以下任一操作:

    从可用列表中选择一个实体,系统会生成该实体可用的操作列表。如需了解操作的详细说明,请参阅连接器任务的实体操作。但是,如果连接器不支持任何实体操作,则此类不受支持的操作不会列在 Operations 列表中。

  • 动作:动作是可通过连接器接口提供给集成的头等函数。动作可让您对一个或多个实体进行更改,并且动作因连接器而异。但可能的情况是,连接器不支持任何动作,在这种情况下,Actions 列表将为空。

系统限制

Apache Kafka 连接器每秒最多可以为每个节点处理 50 个事务,并且会限制任何超出此限制的事务。默认情况下,Integration Connectors 会为连接分配 2 个节点(以提高可用性)。

如需了解适用于 Integration Connectors 的限制,请参阅限制

操作

PublishMessage 操作

此操作会向 Apache Kafka 主题发布消息。下表介绍了 PublishMessage 操作的输入和输出参数。

输入参数

参数名称 需要 数据类型 说明
主题 字符串 您要将消息发布到的主题的名称。
分区 字符串 将消息分配到的分区。该值对于给定主题必须有效。如果您未设置此值,原生客户端会自动设置此值。
字符串 消息键。
消息 字符串 您要发布的消息。消息应为字符串化的 JSON,并且支持的消息大小上限为 10 MB。
HasBytes 布尔值 指定消息是否为二进制格式。
MessageBytes 字符串 Base64 编码字符串形式的消息。
验证 布尔值 指定是否必须根据架构注册表中为主题定义的消息架构对要发布的消息进行验证。如果您在创建连接时指定了架构注册表,则系统将使用注册表中主题的架构定义来进行验证。此字段的默认值为 false

输出参数

参数名称 数据类型 说明
PartitionWritten 整数 写入消息的分区。
OffsetWritten 消息写入到的分区中的位置。
TimestampWritten 消息提交到分区的时间(Unix 时间戳)。
KeyWritten 字符串 写入的消息键的值。如果在写入消息时未提供消息键,则值为 NULL。
成功 布尔值 指定消息是否已发布。

PublishMessage 操作的示例响应如下所示:

{Success: true,
PartitionWritten: 1,
OffsetWritten: 22301,
KeyWritten: "dGVzdA==",
TimestampWritten: 1690806748}

Confluent Cloud 的配置

Confluent Cloud 的配置与之前记录的 Apache Kafka 步骤略有不同。为 Confluent Cloud 创建连接时,请考虑以下几点:

  • Confluent Cloud 集群 API 密钥用作用户名,该密钥的 Secret Manager 密钥用作连接到引导服务器的密码。如果您还没有 API 密钥,则需要在 Confluent Cloud 中创建。
  • Connection Details 部分中,选择使用 SSL
  • 如果您使用的是架构注册表,请配置以下值:
    • Connection Details 部分中:
      • 注册表版本:输入注册表版本号。如果要使用最新版本,请输入 latest
      • 注册表用户:输入架构注册表 API 密钥。如果您还没有架构注册表 API 密钥,则需要创建一个。
      • 注册表密码:输入注册表密码的 Secret Manager Secret。
      • Secret 版本:选择 Secret 版本号。
      • 注册表类型:选择 Confluent
      • 类型检测方案 :选择 MessageOnly
    • Destinations 部分的主机名字段中输入注册表网址。

    使用 Terraform 创建连接

    您可以使用 Terraform 资源创建新连接。

    如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

    如需查看用于创建连接的示例 Terraform 模板,请参阅示例模板

    使用 Terraform 创建此连接时,您必须在 Terraform 配置文件中设置以下变量:

    参数名称 数据类型 需要 说明
    type_detection_scheme ENUM True 用于通过 Apache Kafka 代理进行身份验证的架构。支持的值包括:MessageOnly
    registry_service ENUM False 用于处理主题架构的 Schema Registry 服务。支持的值包括:Confluent
    registry_type ENUM False 为特定主题指定的架构类型。支持的值包括:AVRO、JSON
    registry_version STRING False 从指定主题的 RegistryUrl 读取的架构版本。注册表版本的有效值介于 [1,2^31-1] 或字符串“latest”(会返回上次注册的架构)之间。
    registry_user STRING False 使用 RegistryUrl 中指定的服务器进行授权的用户名。
    registry_password SECRET False Secret Manager Secret,其中包含使用 RegistryUrl 中指定的服务器进行授权的密码/Secret 键值。
    使用 SSL BOOLEAN False 此字段用于设置是否启用 SSL。

    在集成中使用 Apache Kafka 连接

    创建连接后,它在 Apigee Integration 和 Application Integration 中均可使用。您可以通过连接器任务在集成中使用该连接。

    • 如需了解如何在 Apigee Integration 中创建和使用连接器任务,请参阅连接器任务
    • 如需了解如何在 Application Integration 中创建和使用连接器任务,请参阅连接器任务

    向 Google Cloud 社区寻求帮助

    您可以在 Cloud 论坛的 Google Cloud 社区发布您的问题并讨论此连接器。

    后续步骤