Apache Kafka
借助 Apache Kafka 连接器,您可以对 Apache Kafka 数据库执行插入、删除、更新和读取操作。
支持的版本
Apache Kafka 连接器利用原生客户端库建立与给定 Kafka 集群的连接,并且该连接器适用于客户端版本 3.3.1。但是,连接器 能够与版本 3.0 到 3.3.1 的 Kafka 集群建立连接。
准备工作
在使用 Apache Kafka 连接器之前,请先完成以下任务:
- 在您的 Google Cloud 项目中:
- 将 IAM 角色 roles/connectors.admin 授予该用户 配置连接器。
- 将以下 IAM 角色授予您要用其来使用连接器的服务账号:
roles/secretmanager.viewer
roles/secretmanager.secretAccessor
服务账号是一种特殊类型的 Google 账号,用于表示需要验证身份并获得授权以访问 Google API 数据的非人类用户。如果您没有服务账号,则必须创建一个服务账号。如需了解详情,请参阅创建服务账号。
- 启用以下服务:
secretmanager.googleapis.com
(Secret Manager API)connectors.googleapis.com
(Connectors API)
如需了解如何启用服务,请参阅启用服务。
如果之前没有为您的项目启用这些服务或权限,则在您配置连接器时,系统会提示您启用。
配置连接器
配置连接器时,您需要创建与数据源(即后端系统)的连接。一个连接需专用于一个数据源。这意味着,如果您有许多数据源,则必须为每个数据源创建单独的连接。如需创建连接,请执行以下步骤:
- 在 Cloud 控制台 中,进入 Integration Connectors > 连接页面,然后选择或创建一个 Google Cloud 项目。
- 点击 + 新建以打开创建连接页面。
- 在位置步骤中,选择新的 Apache Kafka 连接的位置:
- 区域:从列表中选择一个区域。
- 点击下一步。
- 在连接详情步骤中,提供有关新 Apache Kafka 连接的详细信息:
- 连接器版本:从列表中选择 Apache Kafka 连接器的可用版本。
- 连接名称:输入 Apache Kafka 连接的名称。
- (可选)说明:输入连接的说明。
- (可选)启用 Cloud Logging:选中此复选框可存储连接的所有日志数据。
- 服务账号:选择具有 Apache Kafka 连接所需 IAM 角色的服务账号。
- 对于 Apache Kafka 连接,启用事件订阅、实体和操作选项默认处于选中状态。
- 类型检测方案 :选择
MessageOnly
。 - 注册表服务:用于处理主题架构的 Schema Registry 服务。
- 注册表类型:为特定主题指定的架构类型。
-
注册表版本:从
RegistryUrl
中读取的指定主题的架构版本。 -
注册表用户:要在
RegistryUrl
中指定的服务器进行授权的用户名或访问权限键值对。 -
注册表密码:Secret Manager Secret,其中包含要在
RegistryUrl
中指定的服务器授权的密码/Secret 键值对。 - (可选)配置连接节点设置:
- 节点数下限:输入连接节点数下限。
- 节点数上限:输入连接节点数上限。
节点是处理事务的连接单元(或副本)。 连接处理越多事务就需要越多节点,相反,处理越少事务需要越少节点。 如需了解节点如何影响连接器价格,请参阅连接节点的价格。如果未输入任何值,则默认情况下,节点数下限设置为 2(以便提高可用性),节点数上限设置为 50。
- (可选)点击 + 添加标签,以键值对的形式为连接添加标签。
- 启用 SSL:此字段用于设置是否启用 SSL。
- 点击下一步。
- 在目标部分中,输入要连接到的远程主机(后端系统)的详细信息。
- 目的地类型:选择目的地类型。
- 从列表中选择主机地址,以指定目的地的主机名或 IP 地址。
- 如果要与后端系统建立专用连接,请从列表中选择端点连接,然后从端点连接列表中选择所需的端点连接。
如果要与后端系统建立公共连接以提高安全性,您可以考虑为连接配置静态出站 IP 地址,然后将防火墙规则配置为仅将特定静态 IP 地址列入许可名单。
要输入其他目标,请点击 + 添加目标。
- 点击下一步。
- 目的地类型:选择目的地类型。
-
在 Authentication(身份验证)部分中,输入身份验证详细信息。
- 选择身份验证类型,然后输入相关详情。
Apache Kafka 连接支持以下身份验证类型:
-
用户名和密码
- 用户名:用于连接的 Apache Kafka 用户名。
- 密码:包含与 Apache Kafka 用户名关联的 Secret Manager Secret。
- 身份验证方案:用于身份验证的方案。
Apache Kafka 连接支持以下身份验证方案:
- 纯色
- SCRAM-SHA-1
- SCRAM-SHA-256
-
不可用
如果您要使用匿名登录,请选择不可用。
-
用户名和密码
- 点击下一步。
- 选择身份验证类型,然后输入相关详情。
- 输入死信配置。如果您配置了死信,则连接会写入
将未处理的事件发送到指定的 Pub/Sub 主题。输入以下详细信息:
- 死信项目 ID :您在其中配置了死信 Pub/Sub 主题的 Google Cloud 项目的 ID。
- 死信主题:您要将未处理事件的详细信息写入的 Pub/Sub 主题。
- 点击下一步。
- 查看:查看您的连接和身份验证详细信息。
- 点击创建。
实体、操作和动作
所有集成连接器都会为所连接应用的对象提供抽象层。您只能通过此抽象访问应用的对象。抽象作为实体、操作和动作向您展示。
- 实体:实体可以被视为连接的应用或服务中的对象或属性集合。不同连接器的实体定义也会有所不同。例如,在数据库连接器中,表是实体;在文件服务器连接器中,文件夹是实体;在消息传递系统连接器中,队列是实体。
但是,连接器可能不支持或不支持任何实体,在这种情况下, “
Entities
”列表将为空。 - 操作:操作是指您可以对实体执行的操作。您可以对实体执行以下任一操作:
从可用列表中选择一个实体,系统会生成该实体可用的操作列表。如需了解操作的详细说明,请参阅连接器任务的实体操作。但是,如果连接器不支持任何实体操作, 操作未在
Operations
列表中列出。 - 动作:动作是可通过连接器接口提供给集成的头等函数。动作可让您对一个或多个实体进行更改,并且动作因连接器而异。通常,操作将具有一些输入参数和一个输出参数。但可能的情况是,连接器不支持任何动作,在这种情况下,
Actions
列表将为空。
系统限制
Apache Kafka 连接器每个节点每秒最多可处理 50 笔事务,并会对超出此限制的所有事务进行节流。默认情况下,Integration Connectors 会为连接分配 2 个节点(以提高可用性)。
如需了解适用于集成连接器的限制,请参阅限制。
操作
PublishMessage 操作
此操作会向 Apache Kafka 主题发布消息。下表介绍了 PublishMessage
操作的输入和输出参数。
输入参数
参数名称 | 需要 | 数据类型 | 说明 |
---|---|---|---|
主题 | 是 | 字符串 | 您要向其发布消息的主题的名称。 |
分区 | 否 | 字符串 | 消息分配到的分区。该值必须适用于给定主题。如果您未设置此值,原生客户端会自动设置此值。 |
键 | 否 | 字符串 | 消息键。 |
消息 | 是 | 字符串 | 要发布的消息。该消息应为字符串化 JSON,并且 支持的邮件大小上限为 10MB。 |
HasBytes | 否 | 布尔值 | 指定消息是否采用二进制格式。 |
MessageBytes | 否 | 字符串 | 以 Base64 编码字符串形式的消息。 |
验证 | 否 | 布尔值 | 指定是否必须根据消息架构验证要发布的消息
。如果您在
主题的架构定义从注册表中用于
验证目的。此字段的默认值为 false 。 |
输出参数
参数名称 | 数据类型 | 说明 |
---|---|---|
PartitionWritten | 整数 | 写入消息的分区。 |
OffsetWritten | 长 | 消息写入的分区中的位置。 |
TimestampWritten | 长 | 消息提交到分区的时间(Unix 时间戳)。 |
KeyWritten | 字符串 | 已写入的消息键的值。如果在写入消息时未提供消息密钥,则此值为 NULL。 |
成功 | 布尔值 | 指定消息是否已发布。 |
PublishMessage
操作的示例响应如下所示:
{Success: true, PartitionWritten: 1, OffsetWritten: 22301, KeyWritten: "dGVzdA==", TimestampWritten: 1690806748}
Confluent Cloud 的配置
Confluent Cloud 的配置与之前记录的 Apache Kafka 步骤略有不同。请考虑以下几点 创建 Confluent Cloud 连接时,请注意以下事项:
- Confluent Cloud Cluster API 密钥用作用户名,该密钥的 Secret Manager 密钥用作 连接到引导服务器您需要在 Confluent Cloud 中创建 API 密钥 。
- 在
Connection Details
部分中,选择使用 SSL。 - 如果您使用的是架构注册表,请配置以下值:
- 在
Connection Details
部分中:- 注册表版本:输入注册表版本号。如果您想使用
版本,输入
latest
。 - 注册表用户:输入架构注册表 API 密钥。您需要创建一个架构注册表 如果您还没有 API 密钥,请使用该密钥。
- 注册表密码:输入注册表密码的 Secret Manager Secret。
- Secret 版本:选择 Secret 版本号。
- 注册表类型:选择
Confluent
。 - 类型检测方案 :选择
MessageOnly
- 注册表版本:输入注册表版本号。如果您想使用
版本,输入
- 在
Destinations
部分的主机名字段中输入注册表网址。
使用 Terraform 创建连接
您可以使用 Terraform 资源创建新的连接。如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令。
如需查看用于创建连接的 Terraform 模板示例,请参阅模板示例。
使用 Terraform 创建此连接时,您必须在 Terraform 配置文件中设置以下变量:
参数名称 数据类型 必需 说明 type_detection_scheme ENUM True 用于与 Apache Kafka 代理进行身份验证的方案。支持的值包括:MessageOnly registry_service ENUM 错误 用于处理主题架构的架构注册表服务。支持的值包括:Confluent registry_type ENUM 错误 为特定主题指定的架构类型。支持的值包括:AVRO、JSON registry_version STRING 错误 从指定主题的 RegistryUrl 中读取的架构版本。注册表版本的有效值介于 [1,2^31-1] 之间,或者是字符串“latest”,后者会返回上次注册的架构。 registry_user STRING 错误 要在 RegistryUrl 中指定的服务器进行授权的用户名。 registry_password SECRET 错误 Secret Manager Secret,其中包含要使用 RegistryUrl 中指定的服务器授权的密码/Secret 键值。 usessl BOOLEAN 错误 此字段用于设置是否已启用 SSL。 在集成中使用 Apache Kafka 连接
创建连接后,该连接便可在 Apigee Integration 和 Application Integration。您可以通过连接器任务在集成中使用该连接。
向 Google Cloud 社区寻求帮助
您可以在 Google Cloud 中发布问题和讨论此连接器 Cloud 论坛。后续步骤
- 在