本文档介绍了如何使用 Pub/Sub Group Kafka Connector 集成 Apache Kafka 和 Pub/Sub。
Pub/Sub Group Kafka Connector 简介
Apache Kafka 是一个用于流式处理事件的开源平台。通常 用于在分布式架构中实现松散通信 耦合的组件。Pub/Sub 是一种用于异步发送和接收消息的托管式服务。与 Kafka 一样,您可以使用 使用 Pub/Sub 在云中的组件之间通信 架构。
借助 Pub/Sub Group Kafka Connector,您可以集成这两个系统。 以下连接器打包在连接器 JAR 中:
- 接收器连接器从一个或多个 Kafka 主题中读取记录,并且 发布到 Pub/Sub
- 源连接器会从 Pub/Sub 主题读取消息,并将其发布到 Kafka。
以下是您可能会使用 Pub/Sub Group Kafka Connector 的一些场景:
- 您正在将基于 Kafka 的架构迁移到 Google Cloud。
- 您有一个前端系统,用于将事件存储在 Kafka 之外的 还可使用 Google Cloud 来运行一些后端。 服务,这些服务需要接收 Kafka 事件。
- 您从本地 Kafka 解决方案收集日志并将其发送到 使用 Google Cloud 进行数据分析。
- 您有一个使用 Google Cloud 的前端系统,但您还需要存储数据 使用 Kafka
该连接器要求 Kafka Connect 它是在 Kafka 与其他系统之间流式传输数据的框架。要使用 您必须运行 Kafka Connect 和 Kafka 集群。
本文档假定您熟悉 Kafka 和 Pub/Sub。在阅读本文档之前,建议您先完成某个 Pub/Sub 快速入门。
Pub/Sub 连接器不支持 Google Cloud IAM 和 Kafka Connect ACL 之间的任何集成。
开始使用连接器
本部分将引导您完成以下任务:- 配置 Pub/Sub Group Kafka Connector。
- 将事件从 Kafka 发送到 Pub/Sub。
- 将消息从 Pub/Sub 发送到 Kafka。
前提条件
安装 Kafka
按照 Apache Kafka 快速入门 在本地机器上安装单节点 Kafka。完成下列步骤 快速入门:
- 下载最新的 Kafka 版本并将其解压缩。
- 启动 Kafka 环境。
- 创建 Kafka 主题。
身份验证
Pub/Sub Group Kafka Connector 必须通过 Pub/Sub 进行身份验证, 发送和接收 Pub/Sub 消息。如需设置身份验证,请执行以下操作: 请执行以下步骤:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
下载连接器 JAR
将连接器 JAR 文件下载到本地机器。如需了解详情,请参阅 获取连接器 。
复制连接器配置文件
克隆或下载 GitHub 代码库 。
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
将
config
目录的内容复制到config
您的 Kafka 安装。cp config/* [path to Kafka installation]/config/
这些文件包含连接器的配置设置。
更新您的 Kafka Connect 配置
- 导航到包含您要创建的 Kafka Connect 二进制文件的目录 已下载。
- 在 Kafka Connect 二进制目录中,通过文本编辑器打开名为
config/connect-standalone.properties
的文件。 - 如果
plugin.path property
已被注释掉,请取消注释。 更新
plugin.path property
以包含连接器 JAR 的路径。示例:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
将
offset.storage.file.filename
属性设置为本地文件名。在 独立模式下,Kafka 使用此文件存储偏移数据。示例:
offset.storage.file.filename=/tmp/connect.offsets
将事件从 Kafka 转发到 Pub/Sub
本部分介绍如何启动接收器连接器、将事件发布到 Kafka、 然后从 Pub/Sub 读取转发的消息。
使用 Google Cloud CLI 创建 Pub/Sub 主题, 订阅。
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
替换以下内容:
- PUBSUB_TOPIC:用于接收来自 Kafka 的消息的 Pub/Sub 主题的名称。
- PUBSUB_SUBSCRIPTION:Pub/Sub 的名称 订阅该主题。
在文本编辑器中打开
/config/cps-sink-connector.properties
文件。将 这些值在"TODO"
评论:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
替换以下内容:
- KAFKA_TOPICS:要从中读取的 Kafka 主题的英文逗号分隔列表。
- PROJECT_ID:包含您的 Pub/Sub 主题。
- PUBSUB_TOPIC:用于接收 从 Kafka 读取消息。
在 Kafka 目录中,运行以下命令:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
按照 Apache Kafka 快速入门中的步骤,将一些事件写入 Kafka 主题。
使用 gcloud CLI 从 Pub/Sub 中读取事件。
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
将消息从 Pub/Sub 转发到 Kafka
本部分介绍如何启动源连接器,以及如何将消息发布到 Pub/Sub 并从 Kafka 读取转发的消息。
使用 gcloud CLI 创建包含以下内容的 Pub/Sub 主题: 订阅。
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
替换以下内容:
- PUBSUB_TOPIC:Pub/Sub 主题的名称。
- PUBSUB_SUBSCRIPTION:Pub/Sub 的名称 订阅。
以文本格式打开名为
/config/cps-source-connector.properties
的文件 编辑器。为以下属性(在"TODO"
评论:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
替换以下内容:
- KAFKA_TOPIC:要接收 Pub/Sub 消息。
- PROJECT_ID:包含您的 Pub/Sub 主题的 Google Cloud 项目。
- PUBSUB_TOPIC:Pub/Sub 主题。
从 Kafka 目录运行以下命令:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
使用 gcloud CLI 将消息发布到 Pub/Sub。
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
从 Kafka 读取消息。请按照 Apache Kafka 快速入门 从 Kafka 主题读取消息。
短信转化
Kafka 记录 包含一个键和一个值,它们是可变长度的字节数组。(可选) Kafka 记录还可以具有标头,标头是键值对。答 Pub/Sub 消息 包括两个主要部分:消息正文以及零个或多个键值属性。
Kafka Connect 使用转换器对传入和传出 Kafka 的键和值进行序列化。 如需控制序列化,请在连接器中设置以下属性 配置文件:
key.converter
:用于序列化记录键的转换器。value.converter
:用于对记录值进行序列化的转换器。
Pub/Sub 消息的正文是 ByteString
对象,因此
最高效的转换是直接复制载荷。因此,
建议使用能生成原始数据类型(整数、浮点数、
字符串或字节架构),以防止反序列化和
将相同的消息正文重新序列化。
从 Kafka 到 Pub/Sub 的转换
接收器连接器将 Kafka 记录转换为 Pub/Sub 消息, 如下:
- Kafka 记录键会以名为
"key"
的属性的形式存储在 Pub/Sub 消息中。 - 默认情况下,连接器会丢弃 Kafka 记录中的所有标头。不过,如果您将
headers.publish
配置选项设置为true
,连接器会将标头写入为 Pub/Sub 属性。连接器会跳过 任何超出 Pub/Sub 的标头 针对消息属性的限制。 - 对于整数、浮点数、字符串和字节架构,连接器会将字节 直接复制到 Pub/Sub 消息中 正文。
- 对于结构体架构,连接器将每个字段写入
Pub/Sub 消息。例如,如果该字段为
{ "id"=123 }
, 生成的 Pub/Sub 消息具有"id"="123"
属性。通过 字段值始终转换为字符串。映射和结构体类型 支持作为结构体中的字段类型。 - 对于映射架构,连接器将每个键值对作为
Pub/Sub 消息。例如,如果地图是
{"alice"=1,"bob"=2}
时,生成的 Pub/Sub 消息会有两个 属性,"alice"="1"
和"bob"="2"
。将键和值转换为 转换为字符串。
结构体架构和映射架构还有一些其他行为:
(可选)您可以通过设置
messageBodyName
配置属性,指定特定结构体字段或映射键作为消息正文。通过 字段或键的值会存储为消息正文中的ByteString
。如果 则未设置messageBodyName
,则结构体和 映射架构对于数组值,连接器仅支持基元数组类型。通过 数组中的值序列串联成单个
ByteString
对象。
从 Pub/Sub 到 Kafka 的转换
源连接器将 Pub/Sub 消息转换为 Kafka 记录 如下所示:
Kafka 记录密钥:默认情况下,密钥设置为
null
。您可以选择通过设置kafka.key.attribute
配置选项来指定要用作键的 Pub/Sub 消息属性。在这种情况下, 会查找具有该名称的属性,并将记录键设置为 属性值。如果指定的属性不存在,则记录键为 设为null
。Kafka 记录值。连接器按如下方式写入记录值:
如果 Pub/Sub 消息没有自定义属性,则连接器 将 Pub/Sub 消息正文直接写入 Kafka 记录 值指定为
byte[]
类型,并使用由value.converter
。如果 Pub/Sub 消息具有自定义属性且
kafka.record.headers
为false
,则连接器会将结构体写入记录值。该结构体包含每个属性对应的一个字段,以及一个名为"message"
的字段,其值为 Pub/Sub 消息正文(存储为字节):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
在这种情况下,您必须使用与
value.converter
struct
架构,例如org.apache.kafka.connect.json.JsonConverter
。如果 Pub/Sub 消息具有自定义属性和
kafka.record.headers
为true
,连接器将属性编写为 Kafka 记录标头。它使用value.converter
指定的转换器,将 Pub/Sub 消息正文直接作为byte[]
类型写入 Kafka 记录值。
Kafka 记录标头。默认情况下,标头是空的,除非您将
kafka.record.headers
至true
。
配置选项
除了 Kafka Connect API 提供的配置外, Pub/Sub Group Kafka Connector 支持接收器和来源配置, 具体说明 Pub/Sub 连接器配置。
获取支持
如果您需要帮助,请创建支持工单。对于常规问题和讨论,请在 GitHub 代码库中创建问题。
后续步骤
- 了解 Kafka 和 Pub/Sub 之间的差异。
- 详细了解 Pub/Sub Group Kafka Connector。
- 请参阅 Pub/Sub Group Kafka Connector 的 GitHub 代码库。