本文档介绍如何使用 Pub/Sub Group Kafka Connector 集成 Apache Kafka 和 Pub/Sub Lite。
关于 Pub/Sub Group Kafka 连接器
Apache Kafka 是一个用于流式处理事件的开源平台。它通常用于分布式架构,以实现松散耦合的组件之间的通信。Pub/Sub Lite 是一种用于异步发送和接收消息的代管式服务。与 Kafka 一样,您可以使用 Pub/Sub Lite 在云架构中的组件之间进行通信。
借助 Pub/Sub Group Kafka Connector,您可以集成这两个系统。以下连接器已打包到 Connector JAR 中:
- 接收器连接器从一个或多个 Kafka 主题读取记录,并将其发布到 Pub/Sub Lite。
- 源连接器从 Pub/Sub Lite 主题读取消息并将其发布到 Kafka。
以下是您可以使用 Pub/Sub Group Kafka 连接器的一些场景:
- 您正在将基于 Kafka 的架构迁移到 Google Cloud。
- 您有一个前端系统,该系统可将事件存储在 Google Cloud 之外的 Kafka 中,但您还使用 Google Cloud 来运行需要接收 Kafka 事件的一些后端服务。
- 您可以从本地 Kafka 解决方案收集日志并将其发送到 Google Cloud 进行数据分析。
- 您有一个使用 Google Cloud 的前端系统,但您还使用 Kafka 在本地存储数据。
连接器需要 Kafka Connect,这是一个用于在 Kafka 和其他系统之间流式传输数据的框架。如需使用该连接器,您必须将 Kafka Connect 与 Kafka 集群一起运行。
本文档假定您熟悉 Kafka 和 Pub/Sub Lite。如需开始使用 Pub/Sub Lite,请参阅使用 Google Cloud 控制台在 Pub/Sub Lite 中发布和接收消息。
Pub/Sub Group Kafka Connector 使用入门
本部分将引导您完成以下任务:- 配置 Pub/Sub 群组 Kafka 连接器。
- 将事件从 Kafka 发送到 Pub/Sub Lite。
- 将消息从 Pub/Sub Lite 发送到 Kafka。
前提条件
安装 Kafka
按照 Apache Kafka 快速入门在本地机器上安装单节点 Kafka。请在快速入门中完成下列步骤:
- 下载最新的 Kafka 版本并将其解压缩。
- 启动 Kafka 环境。
- 创建一个 Kafka 主题。
身份验证
Pub/Sub 群组 Kafka 连接器必须通过 Pub/Sub 进行身份验证才能发送和接收 Pub/Sub 消息。如需设置身份验证,请执行以下步骤:
- 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
- 安装 Google Cloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
将
PROJECT_ID
替换为您要创建的 Google Cloud 项目的名称。 -
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
将
PROJECT_ID
替换为您的 Google Cloud 项目 名称。
-
-
为您的 Google 账号创建本地身份验证凭据:
gcloud auth application-default login
-
向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/pubsublite.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- 将
PROJECT_ID
替换为您的项目 ID。 - 将
EMAIL_ADDRESS
替换为您的电子邮件地址。 - 将
ROLE
替换为每个角色。
- 将
- 安装 Google Cloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
将
PROJECT_ID
替换为您要创建的 Google Cloud 项目的名称。 -
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
将
PROJECT_ID
替换为您的 Google Cloud 项目 名称。
-
-
为您的 Google 账号创建本地身份验证凭据:
gcloud auth application-default login
-
向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/pubsublite.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- 将
PROJECT_ID
替换为您的项目 ID。 - 将
EMAIL_ADDRESS
替换为您的电子邮件地址。 - 将
ROLE
替换为每个角色。
- 将
下载连接器 JAR
将连接器 JAR 文件下载到本地计算机。如需了解详情,请参阅 GitHub 自述文件中的获取连接器。
复制连接器配置文件
克隆或下载连接器的 GitHub 代码库。
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
将
config
目录的内容复制到 Kafka 安装的config
子目录中。cp config/* [path to Kafka installation]/config/
这些文件包含连接器的配置设置。
更新 Kafka Connect 配置
- 导航到包含您下载的 Kafka Connect 二进制文件的目录。
- 在 Kafka Connect 二进制目录中,在文本编辑器中打开名为
config/connect-standalone.properties
的文件。 - 如果
plugin.path property
被注释掉,请取消注释。 更新
plugin.path property
以包含连接器 JAR 的路径。示例:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
将
offset.storage.file.filename
属性设置为本地文件名。在独立模式下,Kafka 使用此文件存储偏移数据。示例:
offset.storage.file.filename=/tmp/connect.offsets
将事件从 Kafka 转发到 Pub/Sub Lite
本部分介绍如何启动接收器连接器、将事件发布到 Kafka,以及如何从 Pub/Sub Lite 读取转发的消息。
使用 Google Cloud CLI 创建 Pub/Sub Lite 预留。
gcloud pubsub lite-reservations create RESERVATION_NAME \ --location=LOCATION \ --throughput-capacity=4
替换以下内容:
- RESERVATION_NAME:Pub/Sub Lite 预留的名称。
- LOCATION:预留的位置。
使用 Google Cloud CLI 创建包含订阅的 Pub/Sub Lite 主题。
gcloud pubsub lite-topics create LITE_TOPIC \ --location=LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB \ --throughput-reservation=RESERVATION_NAME gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \ --location=LOCATION \ --topic=LITE_TOPIC
替换以下内容:
- LITE_TOPIC:要从 Kafka 接收消息的 Pub/Sub Lite 主题的名称。
- LOCATION:主题的位置。该值必须与预留的位置匹配。
- RESERVATION_NAME:Pub/Sub Lite 预留的名称。
- LITE_SUBSCRIPTION:主题的 Pub/Sub Lite 订阅的名称。
在文本编辑器中打开
/config/pubsub-lite-sink-connector.properties
文件。为以下属性的值添加在注释中标记为"TODO"
的值:topics=KAFKA_TOPICS pubsublite.project=PROJECT_ID pubsublite.location=LOCATION pubsublite.topic=LITE_TOPIC
替换以下内容:
- KAFKA_TOPICS:要读取的 Kafka 主题的英文逗号分隔列表。
- PROJECT_ID:包含您的 Pub/Sub Lite 主题的 Google Cloud 项目。
- LOCATION:Pub/Sub Lite 主题的位置。
- LITE_TOPIC:用于接收来自 Kafka 的消息的 Pub/Sub Lite 主题。
从 Kafka 目录运行以下命令:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/pubsub-lite-sink-connector.properties
按照 Apache Kafka 快速入门中的步骤将一些事件写入您的 Kafka 主题。
使用从精简版订阅接收消息中显示的任何方法订阅 Pub/Sub Lite 订阅。
将消息从 Pub/Sub Lite 转发到 Kafka
本部分介绍如何启动源连接器、将消息发布到 Pub/Sub Lite,以及读取从 Kafka 转发的消息。
使用 Google Cloud CLI 创建 Pub/Sub Lite 预留。
gcloud pubsub lite-reservations create RESERVATION_NAME \ --location=LOCATION \ --throughput-capacity=4
替换以下内容:
- RESERVATION_NAME:Pub/Sub Lite 预留的名称。
- LOCATION:预留的位置。
使用 Google Cloud CLI 创建包含订阅的 Pub/Sub Lite 主题。
gcloud pubsub lite-topics create LITE_TOPIC \ --location=LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB \ --throughput-reservation=RESERVATION_NAME gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \ --location=LOCATION \ --topic=LITE_TOPIC
替换以下内容:
- LITE_TOPIC:Pub/Sub Lite 主题的名称。
- LOCATION:主题的位置。该值必须与预留的位置匹配。
- RESERVATION_NAME:Pub/Sub Lite 预留的名称。
- LITE_SUBSCRIPTION:主题的 Pub/Sub Lite 订阅的名称。
在文本编辑器中打开名为
/config/pubsub-lite-source-connector.properties
的文件。为以下属性添加值,这些值在注释中标记为"TODO"
:topic=KAFKA_TOPIC pubsublite.project=PROJECT_ID pubsublite.location=LOCATION pubsublite.subscription=LITE_SUBSCRIPTION
替换以下内容:
- KAFKA_TOPIC:用于接收 Pub/Sub 消息的 Kafka 主题。
- PROJECT_ID:包含您的 Pub/Sub 主题的 Google Cloud 项目。
- LOCATION:Pub/Sub Lite 主题的位置。
- LITE_SUBSCRIPTION:Pub/Sub Lite 主题。
从 Kafka 目录运行以下命令:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/pubsub-lite-source-connector.properties
使用向精简版主题发布消息中显示的任意方法,将消息发布到 Pub/Sub Lite 主题。
阅读 Kafka 发来的消息。按照 Apache Kafka 快速入门中的步骤读取 Kafka 主题中的消息。
短信转化
Kafka 记录包含一个键和一个值,它们是可变长度的字节数组。(可选)Kafka 记录还可以包含标头,这些标头是键值对。Pub/Sub Lite 消息具有以下字段:
key
:消息键 (bytes
)data
:消息数据 (bytes
)attributes
:零个或多个属性。每个属性都是一个(key,values[])
映射。一个属性可以有多个值。event_time
:用户提供的事件时间戳(可选)。
Kafka Connect 使用转换器将键和值序列化到 Kafka 或从 Kafka 序列化键和值。如需控制序列化,请在连接器配置文件中设置以下属性:
key.converter
:用于序列化记录键的转换器。value.converter
:用于对记录值进行序列化的转换器。
从 Kafka 转换为 Pub/Sub Lite
接收器连接器将 Kafka 记录转换为 Pub/Sub Lite 消息,如下所示。
Kafka 记录 (SinkRecord ) |
Pub/Sub Lite 消息 |
---|---|
键 | key |
值 | data |
标头 | attributes |
时间戳 | eventTime |
时间戳类型 | attributes["x-goog-pubsublite-source-kafka-event-time-type"] |
主题 | attributes["x-goog-pubsublite-source-kafka-topic"] |
分区 | attributes["x-goog-pubsublite-source-kafka-offset"] |
偏移值 | attributes["x-goog-pubsublite-source-kafka-partition"] |
键、值和标头的编码如下所示:
- Null 架构被视为字符串架构。
- 字节载荷是直接写入的,不进行转换。
- 字符串、整数和浮点载荷被编码为 UTF-8 字节序列。
- 所有其他载荷都编码为协议缓冲区
Value
类型,然后转换为字节字符串。- 嵌套的字符串字段会编码到 protobuf
Value
。 - 嵌套字节字段会编码为保存 base64 编码字节的 protobuf
Value
。 - 嵌套数字字段以双精度形式编码为 protobuf
Value
。 - 不支持具有数组、映射或结构体键的映射。
- 嵌套的字符串字段会编码到 protobuf
从 Pub/Sub Lite 转换为 Kafka
源连接器将 Pub/Sub Lite 消息转换为 Kafka 记录,如下所示:
Pub/Sub Lite 消息 | Kafka 记录 (SourceRecord )
|
---|---|
key |
键 |
data |
值 |
attributes |
标头 |
event_time |
时间戳。如果 event_time 不存在,则使用发布时间。 |
配置选项
除了 Kafka Connect API 提供的配置之外,连接器还支持以下 Pub/Sub Lite 配置。
接收器连接器配置选项
接收器连接器支持以下配置选项。
设置 | 数据类型 | 说明 |
---|---|---|
connector.class |
String |
必填。连接器的 Java 类。对于 Pub/Sub 精简版接收器连接器,该值必须为 com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector 。 |
gcp.credentials.file.path |
String |
可选。指向存储用于对 Pub/Sub Lite 进行身份验证的 Google Cloud 凭据的文件的路径。 |
gcp.credentials.json |
String |
可选。包含 Google Cloud(用于对 Pub/Sub Lite 进行身份验证)的 JSON blob。 |
pubsublite.location |
String |
必需。Pub/Sub Lite 主题的位置。 |
pubsublite.project |
String |
必需。包含 Pub/Sub Lite 主题的 Google Cloud。 |
pubsublite.topic |
String |
必需。用于发布 Kafka 记录的 Pub/Sub Lite 主题。 |
topics |
String |
必需。要读取的 Kafka 主题的英文逗号分隔列表。 |
来源连接器配置选项
源连接器支持以下配置选项。
设置 | 数据类型 | 说明 |
---|---|---|
connector.class |
String |
必填。连接器的 Java 类。对于 Pub/Sub 精简版源连接器,该值必须为 com.google.pubsublite.kafka.source.PubSubLiteSourceConnector 。 |
gcp.credentials.file.path |
String |
可选。指向存储用于对 Pub/Sub Lite 进行身份验证的 Google Cloud 凭据的文件的路径。 |
gcp.credentials.json |
String |
可选。包含 Google Cloud(用于对 Pub/Sub Lite 进行身份验证)的 JSON blob。 |
kafka.topic |
String |
必需。从 Pub/Sub Lite 接收消息的 Kafka 主题。 |
pubsublite.location |
String |
必需。Pub/Sub Lite 主题的位置。 |
pubsublite.partition_flow_control.bytes |
Long |
每个 Pub/Sub Lite 分区的最大未完成字节数。 默认值:20,000,000 |
pubsublite.partition_flow_control.messages |
Long |
每个 Pub/Sub Lite 分区的未完成消息数量上限。 默认值: |
pubsublite.project |
String |
必填。包含 Pub/Sub Lite 主题的 Google Cloud 项目。 |
pubsublite.subscription |
String |
必需。要从中拉取消息的 Pub/Sub Lite 订阅的名称。 |
后续步骤
- 了解 Kafka 与 Pub/Sub 之间的区别。
- 详细了解 Pub/Sub Group Kafka 连接器。
- 请参阅 Pub/Sub Group Kafka Connector GitHub 代码库。