本文档介绍如何使用 Pub/Sub Group Kafka Connector 集成 Apache Kafka 和 Pub/Sub。
关于 Pub/Sub Group Kafka Connector
Apache Kafka 是一个用于流式处理事件的开源平台。它通常用于分布式架构中,以实现松散耦合组件之间的通信。Pub/Sub 是一种用于异步发送和接收消息的代管式服务。与 Kafka 一样,您可以使用 Pub/Sub 在云端架构中的组件之间进行通信。
Pub/Sub Group Kafka Connector 允许您集成这两个系统。以下连接器已打包到连接器 JAR 中:
- 接收器连接器从一个或多个 Kafka 主题读取记录,并将其发布到 Pub/Sub。
- 源连接器从 Pub/Sub 主题读取消息并将其发布到 Kafka。
以下是您可以使用 Pub/Sub 组 Kafka 连接器的一些场景:
- 您要将基于 Kafka 的架构迁移到 Google Cloud。
- 您有一个前端系统,将事件存储在 Google Cloud 之外的 Kafka 中,但您还使用 Google Cloud 运行需要接收 Kafka 事件的一些后端服务。
- 您可以从本地 Kafka 解决方案收集日志并将其发送到 Google Cloud 进行数据分析。
- 您有一个使用 Google Cloud 的前端系统,但您还使用 Kafka 在本地存储数据。
连接器需要 Kafka Connect,这是一个用于在 Kafka 和其他系统之间流式传输数据的框架。如需使用该连接器,您必须同时运行 Kafka Connect 和 Kafka 集群。
本文档假定您熟悉 Kafka 和 Pub/Sub。在阅读本文档之前,建议您完成任一 Pub/Sub 快速入门。
Pub/Sub 连接器不支持 Google Cloud IAM 与 Kafka Connect ACL 之间的任何集成。
连接器使用入门
本部分将引导您完成以下任务:- 配置 Pub/Sub 组 Kafka 连接器。
- 将事件从 Kafka 发送到 Pub/Sub。
- 从 Pub/Sub 向 Kafka 发送消息。
前提条件
安装 Kafka
按照 Apache Kafka 快速入门的说明,在本地机器上安装单节点 Kafka。请在快速入门中完成以下步骤:
- 下载最新的 Kafka 版本并将其解压缩。
- 启动 Kafka 环境。
- 创建 Kafka 主题。
身份验证
Pub/Sub 组 Kafka 连接器必须通过 Pub/Sub 进行身份验证,才能发送和接收 Pub/Sub 消息。如需设置身份验证,请执行以下步骤:
- 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
- 安装 Google Cloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
创建或选择 Google Cloud 项目。
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
-
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
-
-
为您的 Google 帐号创建本地身份验证凭据:
gcloud auth application-default login
-
向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- 将
PROJECT_ID
替换为您的项目 ID。 - 将
EMAIL_ADDRESS
替换为您的电子邮件地址。 - 将
ROLE
替换为每个角色。
- 将
- 安装 Google Cloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
创建或选择 Google Cloud 项目。
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
-
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
-
-
为您的 Google 帐号创建本地身份验证凭据:
gcloud auth application-default login
-
向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- 将
PROJECT_ID
替换为您的项目 ID。 - 将
EMAIL_ADDRESS
替换为您的电子邮件地址。 - 将
ROLE
替换为每个角色。
- 将
下载连接器 JAR
将连接器 JAR 文件下载到本地计算机。如需了解详情,请参阅 GitHub 自述文件中的获取连接器。
复制连接器配置文件
克隆或下载连接器的 GitHub 代码库。
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
将
config
目录的内容复制到 Kafka 安装的config
子目录中。cp config/* [path to Kafka installation]/config/
这些文件包含连接器的配置设置。
更新 Kafka Connect 配置
- 进入您的 Kafka 目录。
- 在文本编辑器中打开名为
config/connect-standalone.properties
的文件。 - 如果
plugin.path property
被注释掉,请取消注释它。 更新
plugin.path property
以包含连接器 JAR 的路径。示例:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
将
offset.storage.file.filename
属性设置为本地文件名。在独立模式下,Kafka 使用此文件来存储偏移数据。示例:
offset.storage.file.filename=/tmp/connect.offsets
将事件从 Kafka 转发到 Pub/Sub
本部分介绍如何启动接收器连接器、将事件发布到 Kafka,然后从 Pub/Sub 读取转发的消息。
使用 Google Cloud CLI 创建具有订阅的 Pub/Sub 主题。
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
替换以下内容:
- PUBSUB_TOPIC:要从 Kafka 接收消息的 Pub/Sub 主题的名称。
- PUBSUB_SUBSCRIPTION:主题的 Pub/Sub 订阅的名称。
在文本编辑器中打开
/config/cps-sink-connector.properties
文件。为以下属性添加值,这些属性在注释中标记为"TODO"
:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
替换以下内容:
- KAFKA_TOPICS:要读取的 Kafka 主题的英文逗号分隔列表。
- PROJECT_ID:包含 Pub/Sub 主题的 Google Cloud 项目。
- PUBSUB_TOPIC:用于接收来自 Kafka 的消息的 Pub/Sub 主题。
从 Kafka 目录运行以下命令:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
按照 Apache Kafka 快速入门中的步骤将一些事件写入您的 Kafka 主题。
使用 gcloud CLI 从 Pub/Sub 读取事件。
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
将消息从 Pub/Sub 转发到 Kafka
本部分介绍如何启动源连接器、将消息发布到 Pub/Sub,以及从 Kafka 读取转发的消息。
使用 gcloud CLI 创建具有订阅的 Pub/Sub 主题。
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
替换以下内容:
- PUBSUB_TOPIC:Pub/Sub 主题的名称。
- PUBSUB_SUBSCRIPTION:Pub/Sub 订阅的名称。
在文本编辑器中打开名为
/config/cps-source-connector.properties
的文件。为以下属性添加值,这些属性在注释中标记为"TODO"
:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
替换以下内容:
- KAFKA_TOPIC:用于接收 Pub/Sub 消息的 Kafka 主题。
- PROJECT_ID:包含 Pub/Sub 主题的 Google Cloud 项目。
- PUBSUB_TOPIC:Pub/Sub 主题。
从 Kafka 目录运行以下命令:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
使用 gcloud CLI 将消息发布到 Pub/Sub。
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
阅读来自 Kafka 的消息。按照 Apache Kafka 快速入门中的步骤读取 Kafka 主题中的消息。
短信转化
Kafka 记录包含一个键和一个值,它们是可变长度的字节数组。或者,Kafka 记录还可以具有标头,这些标头是键值对。Pub/Sub 消息包含两个主要部分:消息正文以及零个或多个键值对属性。
Kafka Connect 使用转换器序列化 Kafka 与 Kafka 之间的键和值。如需控制序列化,请在连接器配置文件中设置以下属性:
key.converter
:用于序列化记录键的转换器。value.converter
:用于序列化记录值的转换器。
Pub/Sub 消息的正文是 ByteString
对象,因此最有效的转换是直接复制载荷。因此,我们建议您尽可能使用可生成原始数据类型(整数、浮点型、字符串或字节架构)的转换器,以防止对同一消息正文进行反序列化和重新序列化。
从 Kafka 到 Pub/Sub
接收器连接器将 Kafka 记录转换为 Pub/Sub 消息,如下所示:
- Kafka 记录键作为名为
"key"
的属性存储在 Pub/Sub 消息中。 - 默认情况下,连接器会丢弃 Kafka 记录中的所有标头。但是,如果将
headers.publish
配置选项设置为true
,则连接器会将标头写入 Pub/Sub 属性。连接器会跳过所有超出 Pub/Sub 消息属性限制的标头。 - 对于整数、浮点数、字符串和字节架构,连接器将 Kafka 记录值的字节直接传递到 Pub/Sub 消息正文。
- 对于结构体架构,连接器将每个字段写入 Pub/Sub 消息的属性。例如,如果该字段为
{ "id"=123 }
,则生成的 Pub/Sub 消息具有"id"="123"
属性。字段值始终转换为字符串。不支持将映射和结构体类型用作结构体中的字段类型。 - 对于映射架构,连接器将每个键值对写入 Pub/Sub 消息的属性。例如,如果映射为
{"alice"=1,"bob"=2}
,则生成的 Pub/Sub 消息具有"alice"="1"
和"bob"="2"
两个属性。键和值将转换为字符串。
结构体和映射架构有一些其他行为:
(可选)您可以通过设置
messageBodyName
配置属性,将特定结构体字段或映射键指定为消息正文。字段或键的值以ByteString
的形式存储在消息正文中。如果您未设置messageBodyName
,则结构体和映射架构的消息正文为空。对于数组值,连接器仅支持基元数组类型。该数组中的值序列会串联到单个
ByteString
对象中。
从 Pub/Sub 转换为 Kafka
源连接器将 Pub/Sub 消息转换为 Kafka 记录,如下所示:
Kafka 记录键:默认情况下,该键设置为
null
。(可选)您可以通过设置kafka.key.attribute
配置选项来指定用作键的 Pub/Sub 消息特性。在这种情况下,连接器会查找具有该名称的属性,并将记录键设置为该属性值。如果指定的属性不存在,则记录键会设置为null
。Kafka 记录值。连接器按如下方式写入记录值:
如果 Pub/Sub 消息没有自定义特性,则连接器使用
value.converter
指定的转换器将 Pub/Sub 消息正文作为byte[]
类型直接写入 Kafka 记录值。如果 Pub/Sub 消息具有自定义特性且
kafka.record.headers
为false
,则连接器会向记录值写入一个结构体。该结构体包含每个特性的一个字段,以及一个名为"message"
的字段,其值为 Pub/Sub 消息正文(以字节形式存储):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
在这种情况下,您必须使用与
struct
架构兼容的value.converter
,例如org.apache.kafka.connect.json.JsonConverter
。如果 Pub/Sub 消息具有自定义属性且
kafka.record.headers
为true
,则连接器会将这些属性写入 Kafka 记录标头。它使用value.converter
指定的转换器将 Pub/Sub 消息正文作为byte[]
类型直接写入 Kafka 记录值。
Kafka 记录标头。默认情况下,标头为空,除非您将
kafka.record.headers
设置为true
。
配置选项
除了 Kafka Connect API 提供的配置之外,Pub/Sub Group Kafka Connector 还支持以下配置。
接收器连接器配置选项
接收器连接器支持以下配置选项。
设置 | 数据类型 | 说明 |
---|---|---|
connector.class |
String |
必填。连接器的 Java 类。对于 Pub/Sub 接收器连接器,该值必须为 com.google.pubsub.kafka.sink.CloudPubSubSinkConnector 。 |
cps.endpoint |
String |
要使用的 Pub/Sub 端点。 默认值: |
cps.project |
String |
必需。包含 Pub/Sub 主题的 Google Cloud。 |
cps.topic |
String |
必需。用于发布 Kafka 记录的 Pub/Sub 主题。 |
gcp.credentials.file.path |
String |
可选。指向存储用于对 Pub/Sub Lite 进行身份验证的 Google Cloud 凭据的文件的路径。 |
gcp.credentials.json |
String |
可选。包含用于对 Pub/Sub Lite 进行身份验证的 Google Cloud 的 JSON blob。 |
headers.publish |
Boolean |
如果为 默认值: |
maxBufferBytes |
Long |
在将主题 Kafka 分区发布到 Pub/Sub 之前,可接收的字节数上限。 默认值:10000000。 |
maxBufferSize |
Integer |
在将 Kafka 主题分区上接收的记录数上限(将记录发布到 Pub/Sub)。 默认值:100。 |
maxDelayThresholdMs |
Integer |
在将待处理记录发布到 Pub/Sub 之前等待达到 默认值:100。 |
maxOutstandingMessages |
Long |
在发布商阻止进一步发布之前,可以发出的记录数上限(包括未完成的批次和待处理的批次)。 默认值: |
maxOutstandingRequestBytes |
Long |
在发布者阻止进一步发布之前,可未完成的总字节数上限,包括未完成的批次和待处理的批次。 默认值: |
maxRequestTimeoutMs |
Integer |
向 Pub/Sub 发布各个请求的超时时间(以毫秒为单位)。 默认值:10000。 |
maxTotalTimeoutMs |
Integer |
发布到 Pub/Sub 的调用的总超时时间(以毫秒为单位),包括重试次数。 默认值:60000。 |
metadata.publish |
Boolean |
如果设置为 默认值: |
messageBodyName |
String |
使用结构体或映射值架构时,请指定要用作 Pub/Sub 消息正文的字段或键的名称。请参阅从 Kafka 到 Pub/Sub 的转换。 默认值: |
orderingKeySource |
String |
指定如何在 Pub/Sub 消息中设置排序键。可以是下列值之一:
默认值: |
topics |
String |
必需。要读取的 Kafka 主题的英文逗号分隔列表。 |
来源连接器配置选项
源连接器支持以下配置选项。
设置 | 数据类型 | 说明 |
---|---|---|
connector.class |
String |
必填。连接器的 Java 类。对于 Pub/Sub 源连接器,该值必须为 com.google.pubsub.kafka.source.CloudPubSubSourceConnector 。 |
cps.endpoint |
String |
要使用的 Pub/Sub 端点。 默认值: |
cps.makeOrderingKeyAttribute |
Boolean |
如果设置为 默认值: |
cps.maxBatchSize |
Integer |
向 Pub/Sub 发出的每个拉取请求要批量处理的消息数上限。 默认值:100 |
cps.project |
String |
必填。包含 Pub/Sub 主题的 Google Cloud 项目。 |
cps.subscription |
String |
必需。要从中拉取消息的 Pub/Sub 订阅的名称。 |
gcp.credentials.file.path |
String |
可选。指向存储用于对 Pub/Sub Lite 进行身份验证的 Google Cloud 凭据的文件的路径。 |
gcp.credentials.json |
String |
可选。包含用于对 Pub/Sub Lite 进行身份验证的 Google Cloud 的 JSON blob。 |
kafka.key.attribute |
String |
用作发布到 Kafka 的消息的键的 Pub/Sub 消息特性。如果设置为 默认值: |
kafka.partition.count |
Integer |
发布消息的 Kafka 主题的 Kafka 分区数量。如果分区架构为 默认值:1。 |
kafka.partition.scheme |
String |
在 Kafka 中向分区分配消息的架构。可以是以下值之一:
默认值: |
kafka.record.headers |
Boolean |
如果为 |
kafka.topic |
String |
必需。从 Pub/Sub 接收消息的 Kafka 主题。 |
获取支持
如需帮助,请创建支持服务工单。 如有一般问题和讨论,请在 GitHub 代码库中创建问题。
后续步骤
- 了解 Kafka 与 Pub/Sub 之间的区别。
- 详细了解 Pub/Sub Group Kafka Connector。
- 请参阅 Pub/Sub Group Kafka Connector GitHub 代码库。