从 Kafka 迁移到 Pub/Sub Lite

如果您正在考虑从自行管理迁移,本文档非常有用 Apache Kafka 到 Pub/Sub Lite。

Pub/Sub Lite 概览

Pub/Sub Lite 是一种高容量消息传递服务,专为 降低运营成本Pub/Sub Lite 提供可用区级和区域级 存储空间和预预配容量。在 Pub/Sub Lite 中 您可以选择可用区级或区域级精简版主题区域级精简版主题优惠 提供与 Pub/Sub 主题相同的可用性服务等级协议 (SLA)。不过, Pub/Sub 与 Pub/Sub Lite 消息复制

如需详细了解 Pub/Sub 和 Pub/Sub Lite, 请参阅什么是 Pub/Sub

如需详细了解精简版支持的区域和可用区,请参阅 Pub/Sub 精简版位置

Pub/Sub Lite 中的术语

以下是 Pub/Sub Lite 的一些关键术语。

  • 消息 -通过 Pub/Sub Lite 服务移动的数据。

  • 主题 -表示消息供稿的已命名资源。介于 Pub/Sub Lite,您可以选择创建可用区级或区域级 精简版主题。Pub/Sub Lite 区域级主题将数据存储在 单个区域的两个可用区Pub/Sub 精简版可用区级主题 可在一个可用区内复制数据。

  • 预留。一个已命名的吞吐量容量池 一个区域内的精简版主题。

  • Subscription:表示对订阅事件感兴趣的已命名资源 接收来自特定精简版主题的消息。订阅与此类似 Kafka 中仅连接到单个主题的使用方群组。

  • 订阅者 -接收消息的 Pub/Sub Lite 的客户端 从精简版主题和指定订阅中获取。订阅 可以有多个订阅者客户端。在这种情况下,这些消息 在订阅者客户端之间实现负载均衡在 Kafka 中 称为“使用方”

  • 发布商。创建消息并发送(发布)的应用 特定精简版主题一个主题可以有多个发布者。 在 Kafka 中,发布者称为生产者。

Kafka 和 Pub/Sub Lite 之间的区别

虽然 Pub/Sub Lite 在概念上与 Kafka 类似, 它属于另一个系统,其 API 范围更窄,更注重 数据注入。虽然这些差异对于 流提取和处理,但在一些特定用例中, 这些差异很重要

Kafka 作为数据库

与 Kafka 不同,Pub/Sub Lite 目前不支持 事务发布或日志压缩,尽管支持幂等性。 将 Kafka 用作数据库时, 消息传输系统如果您主要将 Kafka 用作数据库,请考虑运行 自己的 Kafka 集群,或使用托管式 Kafka 解决方案,例如 Confluent Cloud。 如果这两种解决方案都不可行,您也可以考虑使用 可横向扩容的数据库,如 Cloud Spanner

Kafka 数据流

Kafka 流 是一个基于 Kafka 构建的数据处理系统。虽然它允许 客户客户端注入,需要所有管理员的访问权限 operations.Kafka Streams 还使用事务型数据库属性, 用于存储内部元数据。Pub/Sub Lite 目前无法用于 Kafka Streams 应用。

Apache Beam 是一个类似的流式数据处理系统,与 Kafka 集成, Pub/Sub 和 Pub/Sub Lite 您可以使用 Dataflow 或者您自己的 Apache FlinkApache Spark 集群。

监控

Kafka 客户端可以读取服务器端指标。在 Pub/Sub Lite 中 与发布商和订阅者行为相关的指标 通过 Cloud Monitoring 而无需进行额外配置

容量管理

Kafka 主题的容量由集群的容量决定。 复制、密钥压缩和批处理设置决定了所需的容量 为 Kafka 集群上的任何给定主题提供服务。Kafka 的吞吐量 主题受 Broker 所在机器容量的限制 正在运行。相比之下,您必须同时定义存储空间和吞吐量 Pub/Sub Lite 主题的容量。 Pub/Sub Lite 存储容量是 主题的可配置属性。吞吐量容量基于 配置的容量 reservation、 以及每个分区的固有或配置的限制。

身份验证和安全性

Apache Kafka 支持多种开放式身份验证和加密机制。 使用 Pub/Sub Lite 时,身份验证基于 IAM 系统。静态加密可确保安全 以及运输途中如需详细了解 Pub/Sub Lite 身份验证,请参阅 迁移工作流部分(本文档后面部分)。

将 Kafka 属性映射到 Pub/Sub Lite 属性

Kafka 有许多配置选项来控制主题结构, 以及代理人属性适合数据注入的一些常见模板 及其等效项 Pub/Sub Lite。由于 Pub/Sub Lite 是一种 因此您不必考虑 许多 broker 属性。

主题配置属性

Kafka 属性 Pub/Sub Lite 属性 说明
retention.bytes 每个分区的存储空间 精简版主题中的所有分区都具有相同的存储容量。精简版主题的总存储容量是该主题中所有分区的存储空间容量总和。
retention.ms 消息保留期限 精简版主题存储消息的最长时间。如果您未指定消息保留期限,精简版主题会一直存储消息,直至您超出存储容量。
flush.msacks 无法在 Pub/Sub Lite 中配置 发布只有在保证持久保存到复制的存储区之后,才会得到确认。
max.message.bytes 无法在 Pub/Sub Lite 中配置 3.5 MiB 是可以发送到 Pub/Sub Lite 的消息大小上限。消息大小以可重复的方式计算。
message.timestamp.type 无法在 Pub/Sub Lite 中配置 在使用使用方实现时,系统会选择事件时间戳(如果存在),或改用发布时间戳。使用 Beam 时,发布时间戳和事件时间戳均可用。

如需详细了解精简版主题属性 请参阅精简版主题的属性

提供方配置属性

Pub/Sub Lite 支持 生产者线路协议。 某些属性会更改提供方 Cloud 客户端库的行为; 下表讨论了一些常用选项。

Kafka 属性 Pub/Sub Lite 属性 说明
auto.create.topics.enable 无法在 Pub/Sub Lite 中配置 为 Pub/Sub 精简版中为单个主题创建主题和订阅,其大致等同于 Pub/Sub 精简版中单个主题的使用方群组。您可以使用控制台、gcloud CLI、API 或 Cloud 客户端库。
key.serializervalue.serializer 无法在 Pub/Sub Lite 中配置

使用 Kafka Producer 或等效库通过传输协议进行通信时,是必需的。

batch.size Pub/Sub Lite 支持 支持批处理。为获得最佳性能,此值的建议值为 10 MiB。
linger.ms Pub/Sub Lite 支持 支持批处理。为获得最佳性能,此值的建议值为 50 毫秒。
max.request.size Pub/Sub Lite 支持 服务器对每个批次施加的上限为 20 MiB。在 Kafka 客户端中,将此值设置为低于 20 MiB。
enable.idempotence Pub/Sub Lite 支持
compression.type Pub/Sub Lite 不支持 您必须将此值明确设置为 none

使用方配置属性

Pub/Sub Lite 支持 消费线协议。 某些属性会更改使用方 Cloud 客户端库的行为; 下表讨论了一些常用选项。

Kafka 属性 说明
key.deserializervalue.deserializer

使用 Kafka Consumer 或等效库通过传输协议进行通信时,是必需的。

auto.offset.reset 不支持或不需要此配置。我们保证在创建订阅后具有指定的偏移位置。
message.timestamp.type 发布时间戳始终可从 Pub/Sub Lite 获取,并且保证在每个分区中不递减。事件时间戳是否存在,具体取决于它们在发布时是否附加到了消息中。使用 Dataflow 时,发布时间戳和事件时间戳可同时提供。
max.partition.fetch.bytesmax.poll.records 对从 polol()调用返回的记录数和字节数以及从内部提取请求返回的字节数设置了软限制。将“max.Partition.fetch.bytes”默认值设为 1MiB 可能会限制客户端的吞吐量,请考虑提高此值。

比较 Kafka 和 Pub/Sub Lite 功能

下表比较了 Apache Kafka 特性与 Pub/Sub Lite 特性:

功能 Kafka Pub/Sub 精简版
消息排序
删除重复消息 有,使用 Dataflow
推送订阅 是,使用 Pub/Sub 导出
交易次数
消息存储 受可用机器存储空间的限制 无限制
消息重放
日志记录和监控 自行管理 通过 Cloud Monitoring 自动执行
流处理 使用 Kafka StreamsApache BeamDataproc 时,可以。 可以使用 Beam 或 Dataproc。

下表比较了哪些功能使用 Kafka 自行托管,哪些功能由 Google 使用 Pub/Sub Lite 管理:

功能 Kafka Pub/Sub 精简版
适用范围 手动将 Kafka 部署到其他位置。 已在全球范围内部署。请参阅 locations
灾难恢复 设计和维护您自己的备份和复制。 由 Google 管理。
基础架构管理 手动部署和运行虚拟机 (VM) 或机器。保持一致的版本控制和补丁。 由 Google 管理。
容量规划 提前手动规划存储和计算需求。 由 Google 管理。您可以随时增加计算和存储容量。
支持 无。 提供 24 小时值班员工和支持服务。

Kafka 和 Pub/Sub Lite 费用比较

在 Pub/Sub Lite 中估算和管理费用的方式 与 Kafka 不同。本地或本地 Kafka 集群的费用 包括机器、磁盘、网络、入站邮件和出站邮件的费用 消息。还包括管理和维护这些 系统及其相关基础设施管理 Kafka 集群时 您必须手动升级机器、规划集群容量 实施包括广泛的规划和测试的灾难恢复。 您必须汇总所有这些不同的费用, 总拥有成本 (TCO)。

Pub/Sub Lite 价格包含预留费用 (发布的字节数、订阅的字节数、 Kafka 代理处理的字节数)和 预配存储的费用您只需为自己使用的资源付费 外加出站邮件费用。您可以使用 价格计算器:用于估算费用。

迁移工作流

要将主题从 Kafka 集群迁移到 Pub/Sub Lite,请按照以下说明操作。

配置 Pub/Sub Lite 资源

  1. 创建 Pub/Sub 精简版 预期吞吐量的预留

    使用 Pub/Sub Lite 价格计算器:用于计算汇总 现有 Kafka 主题的吞吐量指标。更多信息 如何创建预留 请参阅创建和管理精简版预留

  2. 为每个相应的主题创建一个 Pub/Sub 精简版主题

    如需详细了解如何创建精简版主题, 请参阅创建和管理精简版主题

  3. 为每个相应的 使用者群组和主题对

    例如,对于名为 consumers 的使用方群组,该使用方使用 topic-atopic-b,你必须consumers-a创建订阅 附加到 topic-a,订阅 consumers-b 附加到 topic-b。 如需详细了解如何创建订阅 请参阅创建和管理精简版订阅

向 Pub/Sub Lite 进行身份验证

根据您的 Kafka 客户端类型,选择以下方法之一:

基于 Java 的 Kafka 客户端 3.1.0 版或更高版本(需要重新构建)

适用于可以重新构建的基于 Java 的 3.1.0 或更高版本的 Kafka 客户端 在运行 Kafka 客户端的实例上:

  1. 安装 com.google.cloud:pubsublite-kafka-auth 软件包。

  2. 获取向 验证身份的必要参数 Pub/Sub Lite com.google.cloud.pubsublite.kafka.ClientParameters.getParams.

    getParams() 方法(请参阅 代码示例 ) 初始化以下代码 JAASSASL 作为参数 Pub/Sub Lite:

    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.oauthbearer.token.endpoint.url=http://localhost:14293
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
    

运行 3.1.0 或更高版本且基于 Java 的 Kafka 客户端,无需重新构建

对于支持 KIP-768 的 Kafka 客户端,我们支持 使用 Python Sidecar 脚本的仅限配置的 OAUTHBEARER 身份验证。 这些版本包括 2022 年 1 月 Java 3.1.0 或更高版本。

在运行 Kafka 客户端的实例上执行以下步骤:

  1. 安装 Python 3.6 或更高版本。

    请参阅安装 Python

  2. 安装 Google 身份验证软件包:pip install google-auth

    此库简化了各种服务器到服务器身份验证 访问 Google API 的机制。请参阅 google-auth 页面

  3. 运行 kafka_gcp_credentials.py 脚本。

    此脚本会启动本地 HTTP 服务器并获取默认的 Google Cloud 凭据 使用 google.auth.default().

    所提取的凭据中的主账号 必须具有pubsublite.locations.openKafkaStream 您正在使用的 Google Cloud 项目的权限,以及 连接。Pub/Sub Lite 发布者 (roles/pubsublite.publisher) 和 Pub/Sub Lite 订阅者 (roles/pubsublite.subscriber) 个角色具有以下权限: 所需权限。将这些角色添加到 principal [主账号]

    这些凭据用于 SASL/OAUTHBEARER 身份验证 读取选项

    您的媒体资源中必须有以下参数 从 Kafka 客户端向 Pub/Sub Lite 进行身份验证:

    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.oauthbearer.token.endpoint.url=localhost:14293
    sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
      required clientId="unused" clientSecret="unused" \
      extension_pubsubProject="PROJECT_ID";
    

    PROJECT_ID 替换为运行 Pub/Sub Lite 的项目的 ID。

无需重建的所有其他客户端

对于所有其他客户端,请执行以下步骤:

  1. 下载服务账号密钥 JSON 文件 为客户端使用的服务账号指定访问权限。

  2. 使用 base64-encode 对服务账号文件进行编码,以用作您的 身份验证字符串。

    在 Linux 或 macOS 系统上,您可以使用 base64 命令 (通常默认安装),如下所示:

    base64 < my_service_account.json > password.txt
    

    您可以使用密码文件的内容进行身份验证 参数如下。

    Java

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
     username="PROJECT_ID" \
     password="contents of base64 encoded password file";
    

    PROJECT_ID 替换为运行 Pub/Sub 的项目的 ID。

    利德卡夫卡

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.username=PROJECT_ID
    sasl.password=contents of base64 encoded password file
    

    PROJECT_ID 替换为运行 Pub/Sub 的项目的 ID。

使用 Kafka Connect 克隆数据

Pub/Sub Lite 团队负责维护 Kafka Connect 接收器。您可以配置此实现以复制数据 从 Kafka 主题转移到 Pub/Sub Lite 主题 Kafka Connect 集群。

要配置连接器以执行数据复制,请参阅 Pub/Sub Group Kafka Connector

如果要确保分区亲和性不受迁移影响 确保 kafka 主题和 Pub/Sub 精简版主题 相同数量的分区,并且 pubsublite.ordering.mode 属性设置为 KAFKA。这会使该连接器将邮件转送到 与 kafka 分区具有相同索引的 Pub/Sub Lite 分区 这些内容最初发布到此处

迁移使用方

Pub/Sub Lite 的资源模型与 Kafka 的资源模型不同。最值得注意的是, 与使用方群组不同,订阅是显式资源, 与一个主题相关联。由于存在这种差异, 需要传递 topic(完整订阅)的 Kafka Consumer API 路径。

除了用于 Kafka 客户端的 SASL 配置之外, 使用 Kafka Consumer API 时,还需要进行以下设置 Pub/Sub Lite 的交互方式。

bootstrap.servers=REGION-kafka-pubsub.googleapis.com:443
group.id=unused

REGION 替换为 Pub/Sub 精简版订阅所在的区域

启动第一个 Pub/Sub Lite 使用方作业之前, 您可以发起(但不要等待) admin See 操作来为 Cloud Storage 存储分区设置 您的消费者。

当您启动使用方时,他们会重新连接到 消息积压。同时运行旧版和 只要验证这些客户的行为 然后再关闭旧的消费者客户端

迁移提供方

除了用于 Kafka 客户端的 SASL 配置之外, 使用 用于与 Pub/Sub Lite 交互的 Kafka Producer API。

bootstrap.servers=REGION-kafka-pubsub.googleapis.com:443

REGION 替换为 Pub/Sub 精简版主题所在的区域

迁移完主题的所有使用方后,即可读取该主题 将生产者流量写入 Pub/Sub Lite

逐步迁移生产者客户端,以写入 Pub/Sub Lite 主题,而不是 Kafka 主题。

重启提供方客户端以获取新配置。

关闭 Kafka Connect

迁移所有要写入的提供方后 Pub/Sub Lite,则连接器不会再复制数据。

您可以关闭 Kafka Connect 实例。

排查 Kafka 连接问题

由于 Kafka 客户端通过定制的线路协议进行通信, 我们无法就所有请求中的失败情况提供错误消息。 依赖于在消息中发送的错误代码。

您可以通过以下方法详细了解客户端中发生的错误: 将 org.apache.kafka 前缀的日志记录级别设置为 FINEST

吞吐量低且积压输入不断增加

吞吐量低的原因可能有多种 以及不断增加的积压输入量。其中一个原因可能是容量不足。

您可以在主题级别或通过 预留资源如果订阅和 配置发布作业后, 订阅和发布受到限制。

此吞吐量错误由 topic/flow_control_status 指标 以及 subscription/flow_control_status “订阅人数”指标该指标提供以下状态:

  • NO_PARTITION_CAPACITY:此消息表示每个分区 达到吞吐量上限。

  • NO_RESERVATION_CAPACITY:此消息表示每项预留 达到吞吐量上限。

您可以查看主题或预留的利用率图表 发布和订阅配额,并检查利用率是否等于或接近 100%。

要解决此问题,请增加 吞吐量容量 主题或预留。

主题授权失败错误消息

如果使用 Kafka API 发布,精简版服务代理必须具有 发布至 Pub/Sub Lite 主题的适当权限。

在该事件中,您的客户端中收到 TOPIC_AUTHORIZATION_FAILED 错误 您没有适当的权限,无法将应用发布到 Pub/Sub 精简版主题。

如需解决此问题,请检查项目的精简版服务代理是否通过了测试 。

“主题无效”错误消息

使用 Kafka API 订阅需要传递完整的订阅路径 Kafka Consumer API 中需要 topic 的所有位置。

如果没有,您会在使用方客户端中收到 INVALID_TOPIC_EXCEPTION 错误 传递格式正确的订阅路径。

不使用预留时请求无效

使用 kafka 有线协议支持要求所有主题都有关联的 预留,以便收取使用费。