Apache Kafka

借助 Apache Kafka 连接器,您可以对 Apache Kafka 数据库执行插入、删除、更新和读取操作。

支持的版本

Apache Kafka 连接器利用原生客户端库来连接到 给定 Kafka 集群,并且该连接器适用于客户端版本 3.3.1。但是,连接器 能够与版本 3.0 到 3.3.1 的 Kafka 集群建立连接。

准备工作

在使用 Apache Kafka 连接器之前,请先完成以下任务:

  • 在您的 Google Cloud 项目中:
    • 将 IAM 角色 roles/connectors.admin 授予该用户 配置连接器。
    • 将以下 IAM 角色授予您要用其来使用连接器的服务账号:
      • roles/secretmanager.viewer
      • roles/secretmanager.secretAccessor

      服务账号是一种特殊类型的 Google 账号,用于表示需要验证身份并获得授权以访问 Google API 数据的非人类用户。如果您没有服务账号,则必须创建一个服务账号。如需了解详情,请参阅创建服务账号

    • 启用以下服务:
      • secretmanager.googleapis.com (Secret Manager API)
      • connectors.googleapis.com (Connectors API)

      如需了解如何启用服务,请参阅启用服务

    如果之前没有为您的项目启用这些服务或权限,则在您配置连接器时,系统会提示您启用。

配置连接器

配置连接器时,您需要创建与数据源(即后端系统)的连接。一个连接需专用于一个数据源。这意味着,如果您有许多数据源,则必须为每个数据源创建单独的连接。如需创建连接,请执行以下步骤:

  1. Cloud 控制台 中,进入 Integration Connectors > 连接页面,然后选择或创建一个 Google Cloud 项目。

    转到“连接”页面

  2. 点击 + 新建以打开创建连接页面。
  3. 位置步骤中,选择新的 Apache Kafka 连接的位置:
    1. 区域:从列表中选择一个区域。
    2. 点击下一步
  4. 连接详情步骤中,提供有关新 Apache Kafka 连接的详细信息:
    1. 连接器版本:从列表中选择 Apache Kafka 连接器的可用版本。
    2. Connection Name(连接名称):输入 Apache Kafka 连接的名称。
    3. (可选)说明 :输入连接说明。
    4. (可选)启用 Cloud Logging:选中此复选框可存储连接的所有日志数据。
    5. 服务账号:选择具有 Apache Kafka 连接所需 IAM 角色的服务账号。
    6. 对于 Apache Kafka 连接,启用事件订阅、实体和操作选项默认处于选中状态。
    7. 类型检测方案 :选择 MessageOnly
    8. 注册表服务:用于处理主题架构的架构注册表服务。
    9. 注册表类型:为特定主题指定的架构类型。
    10. 注册表版本:从 RegistryUrl 中读取的指定主题的架构版本。
    11. Registry User(注册表用户):通过 RegistryUrl 中指定的服务器授权的用户名或访问密钥值。
    12. 注册表密码:包含要使用 RegistryUrl 中指定的服务器授权的密码/密钥密钥值的 Secret Manager Secret。
    13. (可选)配置连接节点设置

      • 节点数下限:输入连接节点数下限。
      • 节点数上限:输入连接节点数上限。

      节点是处理事务的连接单元(或副本)。 连接处理越多事务就需要越多节点,相反,处理越少事务需要越少节点。 如需了解节点如何影响连接器价格,请参阅连接节点的价格。如果未输入任何值,则默认情况下,节点数下限设置为 2(以便提高可用性),节点数上限设置为 50。

    14. (可选)点击 + 添加标签,以键值对的形式为连接添加标签。
    15. 启用 SSL:此字段用于设置是否启用 SSL。
    16. 点击下一步
  5. 目标位置部分,输入您要连接的远程主机(后端系统)的详细信息。
    1. 目的地类型:选择目的地类型
      • 从列表中选择主机地址,以指定目的地的主机名或 IP 地址。
      • 如果要与后端系统建立专用连接,请从列表中选择端点连接,然后从端点连接列表中选择所需的端点连接。

      如果要与后端系统建立公共连接以提高安全性,您可以考虑为连接配置静态出站 IP 地址,然后将防火墙规则配置为仅将特定静态 IP 地址列入许可名单。

      要输入其他目标,请点击 + 添加目标

    2. 点击下一步
  6. Authentication(身份验证)部分中,输入身份验证详细信息。
    1. 选择身份验证类型,然后输入相关详情。

      Apache Kafka 连接支持以下身份验证类型:

      • 用户名和密码
        • 用户名:用于连接的 Apache Kafka 用户名。
        • 密码:包含与 Apache Kafka 用户名关联的 Secret Manager Secret。
        • 身份验证方案:用于身份验证的方案。

          Apache Kafka 连接支持以下身份验证方案:

          • 纯色
          • SCRAM-SHA-1
          • SCRAM-SHA-256
      • 不可用

        如果您要使用匿名登录,请选择不可用

    2. 点击下一步
  7. 输入死信配置。如果您配置了死信,则连接会写入 将未处理的事件发送到指定的 Pub/Sub 主题。输入以下详细信息:
    1. 死信项目 ID :您在其中配置了死信 Pub/Sub 主题的 Google Cloud 项目的 ID。
    2. 死信主题 :您要在其中写入未处理事件详细信息的 Pub/Sub 主题。
  8. 点击下一步
  9. 查看:查看您的连接和身份验证详细信息。
  10. 点击创建

实体、操作和动作

所有集成连接器都会为所连接应用的对象提供抽象层。您只能通过此抽象访问应用的对象。抽象作为实体、操作和动作向您展示。

  • 实体:实体可以被视为连接的应用或服务中的对象或属性集合。不同连接器的实体定义也会有所不同。例如,在数据库连接器中,表是实体;在文件服务器连接器中,文件夹是实体;在消息传递系统连接器中,队列是实体。

    但是,连接器可能不支持或不支持任何实体,在这种情况下, “Entities”列表将为空。

  • 操作:操作是指您可以对实体执行的操作。您可以对实体执行以下任一操作:

    从可用列表中选择一个实体,系统会生成该实体可用的操作列表。如需了解操作的详细说明,请参阅连接器任务的实体操作。但是,如果连接器不支持任何实体操作, 操作未在 Operations 列表中列出。

  • 动作:动作是可通过连接器接口提供给集成的头等函数。动作可让您对一个或多个实体进行更改,并且动作因连接器而异。通常,操作有一些输入参数和一个输出 参数。但可能的情况是,连接器不支持任何动作,在这种情况下,Actions 列表将为空。

系统限制

Apache Kafka 连接器每秒最多可以处理 50 个事务, 每节点, 并限制任何 交易量超出此限制。 默认情况下,Integration Connectors 会为连接分配 2 个节点(以提高可用性)。

如需了解适用于 Integration Connectors 的限制,请参阅限制

操作

PublishMessage 操作

此操作会向 Apache Kafka 主题发布消息。下表介绍了 PublishMessage 操作的输入和输出参数。

输入参数

参数名称 需要 数据类型 说明
主题 字符串 您要将消息发布到的主题的名称。
分区 字符串 消息分配到的分区。该值必须适用于 特定主题。如果未设置此值,则由原生客户端自动设置。
字符串 消息键。
消息 字符串 要发布的消息。该消息应为字符串化 JSON,并且 支持的邮件大小上限为 10MB。
HasBytes 布尔值 指定消息是否采用二进制格式。
MessageBytes 字符串 以 Base64 编码字符串形式的消息。
验证 布尔值 指定是否必须根据消息架构验证要发布的消息 。如果您在 主题的架构定义从注册表中用于 验证目的。此字段的默认值为 false

输出参数

参数名称 数据类型 说明
PartitionWritten 整数 写入消息的分区。
OffsetWritten 向分区中写入消息的位置。
TimestampWritten 消息提交到分区的时间(Unix 时间戳)。
KeyWritten 字符串 已写入的消息键的值。如果未提供消息键,则值为 NULL 。
成功 布尔值 指定消息是否已发布。

PublishMessage 操作的示例响应如下所示:

{Success: true,
PartitionWritten: 1,
OffsetWritten: 22301,
KeyWritten: "dGVzdA==",
TimestampWritten: 1690806748}

Confluent Cloud 的配置

Confluent Cloud 的配置略有不同 先前记录的 Apache Kafka 的步骤。请考虑以下几点 创建 Confluent Cloud 连接时,请注意以下事项:

  • Confluent Cloud Cluster API 密钥用作用户名,该密钥的 Secret Manager 密钥用作 连接到引导服务器您需要在 Confluent Cloud 中创建 API 密钥 。
  • Connection Details 部分中,选择 Use SSL(使用 SSL)。
  • 如果您使用的是架构注册表,请配置以下值:
    • Connection Details 部分中:
      • 注册表版本:输入注册表版本号。如果您想使用 版本,输入 latest
      • 注册表用户:输入架构注册表 API 密钥。您需要创建一个架构注册表 如果您还没有 API 密钥,请使用该密钥。
      • 注册表密码:输入注册表密码的 Secret Manager Secret。
      • Secret 版本:选择 Secret 版本号。
      • 注册表类型:选择 Confluent
      • 类型检测方案 :选择 MessageOnly
    • Destinations 部分的主机名字段中输入注册表网址。

    使用 Terraform 创建连接

    您可以使用 Terraform 资源以创建新连接。

    如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

    如需查看用于创建连接的 Terraform 模板示例,请参阅示例模板

    使用 Terraform 创建此连接时,您必须在 Terraform 配置文件中设置以下变量:

    参数名称 数据类型 必需 说明
    type_detection_scheme ENUM True 用于通过 Apache Kafka Broker 进行身份验证的方案。支持的值包括:MessageOnly
    registry_service ENUM 错误 用于处理主题架构的架构注册表服务。支持的值包括:Confluent
    registry_type ENUM 错误 为特定主题指定的架构类型。支持的值包括:AVRO、JSON
    registry_version STRING 错误 从指定主题的 RegistryUrl 读取的架构版本。注册表版本的有效值介于 [1,2^31-1] 之间或字符串“latest”,此字符串会返回上次注册的架构。
    registry_user STRING 错误 通过 RegistryUrl 中指定的服务器授权的用户名。
    registry_password SECRET 错误 Secret Manager Secret,其中包含要使用 RegistryUrl 中指定的服务器进行授权的密码/Secret 键值。
    usessl BOOLEAN 错误 此字段用于设置是否启用 SSL。

    在集成中使用 Apache Kafka 连接

    创建连接后,它便可同时在 Apigee Integration 和 Application Integration。您可以使用连接 通过连接器任务在集成中进行集成。

    • 如需了解如何在 Apigee Integration 中创建和使用连接器任务,请参阅连接器任务
    • 如需了解如何在 Application Integration 中创建和使用连接器任务,请参阅连接器任务

    向 Google Cloud 社区寻求帮助

    您可以在 Google Cloud 中发布问题和讨论此连接器 Cloud 论坛

    后续步骤