将 Pub/Sub 连接到 Apache Kafka

本文档介绍如何将 Apache Kafka 和 Pub/Sub 集成, 使用 Pub/Sub Group Kafka Connector

Pub/Sub Group Kafka Connector 简介

Apache Kafka 是一个用于流式处理事件的开源平台。通常 用于在分布式架构中实现松散通信 耦合的组件。Pub/Sub 是一种代管式服务, 异步接收消息与 Kafka 一样,您可以使用 使用 Pub/Sub 在云中的组件之间通信 架构。

借助 Pub/Sub Group Kafka Connector,您可以集成这两个系统。 以下连接器打包在连接器 JAR 中:

  • 接收器连接器从一个或多个 Kafka 主题中读取记录,并且 发布到 Pub/Sub
  • 源连接器从 Pub/Sub 主题中读取消息 并发布到 Kafka

以下是您可能会使用 Pub/Sub Group Kafka Connector 的一些场景:

  • 您正在将基于 Kafka 的架构迁移到 Google Cloud。
  • 您有一个前端系统,用于将事件存储在 Kafka 之外的 还可使用 Google Cloud 来运行一些后端。 服务,这些服务需要接收 Kafka 事件。
  • 您从本地 Kafka 解决方案收集日志并将其发送到 使用 Google Cloud 进行数据分析。
  • 您有一个使用 Google Cloud 的前端系统,但您还需要存储数据 使用 Kafka

该连接器要求 Kafka Connect 它是在 Kafka 与其他系统之间流式传输数据的框架。要使用 您必须运行 Kafka Connect 和 Kafka 集群。

本文档假定您熟悉 Kafka 和 Pub/Sub。在阅读本文档之前,建议您先了解 请完成以下任一 Pub/Sub 快速入门

Pub/Sub 连接器不支持任何集成 Google Cloud IAM 与 Kafka Connect ACL 之间。

开始使用连接器

本部分将引导您完成以下任务:

  1. 配置 Pub/Sub Group Kafka Connector。
  2. 将事件从 Kafka 发送到 Pub/Sub。
  3. 将消息从 Pub/Sub 发送到 Kafka。

前提条件

安装 Kafka

按照 Apache Kafka 快速入门 在本地机器上安装单节点 Kafka。完成下列步骤 快速入门:

  1. 下载最新的 Kafka 版本并将其解压缩。
  2. 启动 Kafka 环境。
  3. 创建 Kafka 主题。

身份验证

Pub/Sub Group Kafka Connector 必须通过 Pub/Sub 进行身份验证, 发送和接收 Pub/Sub 消息。如需设置身份验证,请执行以下操作: 请执行以下步骤:

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装 Google Cloud CLI。
  3. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. 为您的 Google 账号创建本地身份验证凭据:

    gcloud auth application-default login
  6. 向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • EMAIL_ADDRESS 替换为您的电子邮件地址。
    • ROLE 替换为每个角色。
  7. 安装 Google Cloud CLI。
  8. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. 为您的 Google 账号创建本地身份验证凭据:

    gcloud auth application-default login
  11. 向您的 Google 账号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • EMAIL_ADDRESS 替换为您的电子邮件地址。
    • ROLE 替换为每个角色。

下载连接器 JAR

将连接器 JAR 文件下载到本地计算机。如需了解详情,请参阅 获取连接器

复制连接器配置文件

  1. 克隆或下载 GitHub 代码库

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. config 目录的内容复制到config 您的 Kafka 安装。

    cp config/* [path to Kafka installation]/config/
    

这些文件包含连接器的配置设置

更新您的 Kafka Connect 配置

  1. 导航到包含您要创建的 Kafka Connect 二进制文件的目录 已下载。
  2. 在 Kafka Connect 二进制目录中,打开名为 文本编辑器中支持 config/connect-standalone.properties
  3. 如果 plugin.path property 已被注释掉,请取消注释。
  4. 更新 plugin.path property 以包含连接器 JAR 的路径。

    示例:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. offset.storage.file.filename 属性设置为本地文件名。在 独立模式下,Kafka 使用此文件存储偏移数据。

    示例:

    offset.storage.file.filename=/tmp/connect.offsets
    

将事件从 Kafka 转发到 Pub/Sub

本部分介绍如何启动接收器连接器、将事件发布到 Kafka、 然后从 Pub/Sub 读取转发的消息。

  1. 使用 Google Cloud CLI 创建 Pub/Sub 主题, 订阅。

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    替换以下内容:

    • PUBSUB_TOPIC:Cloud Pub/Sub 主题的名称 从 Kafka 接收消息
    • PUBSUB_SUBSCRIPTION:Pub/Sub 的名称 订阅该主题。
  2. 在文本编辑器中打开 /config/cps-sink-connector.properties 文件。将 这些值在"TODO" 评论:

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC
    

    替换以下内容:

    • KAFKA_TOPICS:要读取的 Kafka 主题的逗号分隔列表 。
    • PROJECT_ID:包含您的 Pub/Sub 主题。
    • PUBSUB_TOPIC:用于接收 从 Kafka 读取消息。
  3. 从 Kafka 目录运行以下命令:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. 请按照 Apache Kafka 快速入门 将一些事件写入您的 Kafka 主题。

  5. 使用 gcloud CLI 从 Pub/Sub 中读取事件。

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
    

将消息从 Pub/Sub 转发到 Kafka

本部分介绍如何启动源连接器,以及如何将消息发布到 Pub/Sub 并从 Kafka 读取转发的消息。

  1. 使用 gcloud CLI 创建包含以下内容的 Pub/Sub 主题: 订阅。

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    替换以下内容:

    • PUBSUB_TOPIC:Pub/Sub 主题的名称。
    • PUBSUB_SUBSCRIPTION:Pub/Sub 的名称 订阅。
  2. 以文本格式打开名为 /config/cps-source-connector.properties 的文件 编辑器。为以下属性(在"TODO" 评论:

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION
    

    替换以下内容:

    • KAFKA_TOPIC:要接收 Pub/Sub 消息。
    • PROJECT_ID:包含您的 Pub/Sub 主题。
    • PUBSUB_TOPIC:Pub/Sub 主题。
  3. 从 Kafka 目录运行以下命令:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. 使用 gcloud CLI 向 Pub/Sub 发布消息。

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    
  5. 从 Kafka 读取消息。请按照 Apache Kafka 快速入门 从 Kafka 主题读取消息。

短信转化

Kafka 记录 包含一个键和一个值,它们是可变长度的字节数组。(可选) Kafka 记录还可以具有标头,标头是键值对。答 Pub/Sub 消息 包括两个主要部分:消息正文以及零个或多个键值属性。

Kafka Connect 使用转换器对传入和传出 Kafka 的键和值进行序列化。 如需控制序列化,请在连接器中设置以下属性 配置文件:

  • key.converter:用于对记录键进行序列化的转换器。
  • value.converter:用于对记录值进行序列化的转换器。

Pub/Sub 消息的正文是 ByteString 对象,因此 最高效的转换是直接复制载荷。因此, 建议使用能生成原始数据类型(整数、浮点数、 字符串或字节架构),以防止反序列化和 将相同的消息正文重新序列化。

从 Kafka 到 Pub/Sub 的转换

接收器连接器将 Kafka 记录转换为 Pub/Sub 消息, 如下:

  • Kafka 记录键以属性 "key" 的形式存储在 Pub/Sub 消息。
  • 默认情况下,连接器会丢弃 Kafka 记录中的所有标头。但是,如果 请将 headers.publish 配置选项设置为 true,即连接器 将标头作为 Pub/Sub 属性写入。连接器会跳过 任何超出 Pub/Sub 的标头 针对消息属性的限制
  • 对于整数、浮点数、字符串和字节架构,连接器会将字节 直接复制到 Pub/Sub 消息中 正文。
  • 对于结构体架构,连接器将每个字段写入 Pub/Sub 消息。例如,如果该字段为 { "id"=123 }, 生成的 Pub/Sub 消息具有 "id"="123" 属性。通过 字段值始终转换为字符串。映射和结构体类型 支持作为结构体中的字段类型。
  • 对于映射架构,连接器将每个键值对作为 Pub/Sub 消息。例如,如果地图是 {"alice"=1,"bob"=2} 时,生成的 Pub/Sub 消息将具有两个 属性,"alice"="1""bob"="2"。将键和值转换为 转换为字符串。

结构体架构和映射架构还有一些其他行为:

  • 或者,您可以将特定结构体字段或映射键指定为 方法是设置 messageBodyName 配置属性。通过 字段或键的值会存储为消息正文中的 ByteString。如果 则未设置 messageBodyName,则结构体和 映射架构

  • 对于数组值,连接器仅支持基元数组类型。通过 数组中的值序列串联成单个 ByteString 对象。

从 Pub/Sub 到 Kafka 的转换

源连接器将 Pub/Sub 消息转换为 Kafka 记录 如下所示:

  • Kafka 记录密钥:默认情况下,密钥设置为 null。您可以选择 可以指定要用作键的 Pub/Sub 消息属性, 设置 kafka.key.attribute 配置选项。在这种情况下, 会查找具有该名称的属性,并将记录键设置为 属性值。如果指定的属性不存在,则记录键为 设为 null

  • Kafka 记录值。连接器按如下方式写入记录值:

    • 如果 Pub/Sub 消息没有自定义属性,则连接器 将 Pub/Sub 消息正文直接写入 Kafka 记录 值指定为 byte[] 类型,并使用 value.converter.

    • 如果 Pub/Sub 消息具有自定义属性和 kafka.record.headersfalse,连接器将结构体写入 记录值。结构体中,每个属性对应一个字段,还有一个字段 名为 "message",其值是 Pub/Sub 消息正文 (存储为字节):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      在这种情况下,您必须使用与value.converter struct 架构,例如 org.apache.kafka.connect.json.JsonConverter

    • 如果 Pub/Sub 消息具有自定义属性和 kafka.record.headerstrue,连接器将属性编写为 Kafka 记录标头。它会将 Pub/Sub 消息正文 使用转换器直接发送到 byte[] 类型的 Kafka 记录值 由 value.converter 指定。

  • Kafka 记录标头。默认情况下,标头是空的,除非您将 kafka.record.headerstrue

配置选项

除了 Kafka Connect API 提供的配置外, Pub/Sub Group Kafka Connector 支持以下配置。

接收器连接器配置选项

接收器连接器支持以下配置选项。

设置 数据类型 说明
connector.class String 必填。连接器的 Java 类。对于 Pub/Sub 接收器连接器,则值必须为 com.google.pubsub.kafka.sink.CloudPubSubSinkConnector.
cps.endpoint String

要使用的 Pub/Sub 端点。

默认值:"pubsub.googleapis.com:443"

cps.project String 必需。包含 Pub/Sub 主题。
cps.topic String 必需。要发布的 Pub/Sub 主题 Kafka 记录至。
gcp.credentials.file.path String 可选。存储 Google Cloud 凭据的文件的路径 用于对 Pub/Sub Lite 进行身份验证。
gcp.credentials.json String 可选。一个包含 Google Cloud 的 JSON blob, Pub/Sub Lite 进行身份验证。
headers.publish Boolean

如果为 true,请添加所有 Kafka 记录标头,如下所示: Pub/Sub 消息特性。

默认值:false

maxBufferBytes Long

主题 Kafka 分区上可接收的字节数上限 然后再将其发布到 Pub/Sub

默认值:10000000。

maxBufferSize Integer

Kafka 主题分区上接收的最大记录数 然后再将其发布到 Pub/Sub

默认值:100。

maxDelayThresholdMs Integer

等待达到的最长时间 maxBufferSizemaxBufferBytes(发布前) 流向 Pub/Sub 的未完成记录(以毫秒为单位)。

默认值:100。

maxOutstandingMessages Long

可以未完成的记录数上限,包括 未完成和待完成的批次,然后发布商进一步屏蔽 发布。

默认值:Long.MAX_VALUE

maxOutstandingRequestBytes Long

可以未完成的总字节数上限,包括 未完成和待完成的批次,然后发布商进一步屏蔽 发布。

默认值:Long.MAX_VALUE

maxRequestTimeoutMs Integer

向 Pub/Sub 发出的单个发布请求的超时; (以毫秒为单位)。

默认值:10000。

maxTotalTimeoutMs Integer

调用发布到的超时总超时(以毫秒为单位) Pub/Sub,包括重试。

默认值:60000。

metadata.publish Boolean

如果为 true,包括 Kafka 主题、分区、偏移 和时间戳作为 Pub/Sub 消息属性。

默认值:false

messageBodyName String

使用结构体或映射值架构时,指定 字段或键。请参阅 从 Kafka 到 Pub/Sub 的转换

默认值:"cps_message_body"

orderingKeySource String

指定如何在 Pub/Sub 中设置排序键 消息。可以是下列值之一:

  • none:不要设置排序键。
  • key:使用 Kafka 记录键作为排序键。
  • partition:使用分区号(已转换为 字符串作为排序键。仅针对低吞吐量使用此设置 或具有数千个分区的主题。

默认值:none

topics String 必需。以英文逗号分隔的 Kafka 主题列表, 读取内容。

来源连接器配置选项

源连接器支持以下配置选项。

设置 数据类型 说明
connector.class String 必填。连接器的 Java 类。对于 Pub/Sub 源连接器,则值必须为 com.google.pubsub.kafka.source.CloudPubSubSourceConnector.
cps.endpoint String

要使用的 Pub/Sub 端点。

默认值:"pubsub.googleapis.com:443"

cps.makeOrderingKeyAttribute Boolean

如果为 true,则将排序键写入 Kafka 记录。 使用与 Pub/Sub 消息属性相同的格式。 请参阅从 Pub/Sub 转换为 Kafka 记录

默认值:false

cps.maxBatchSize Integer

每个拉取请求要批处理的消息数量上限 Pub/Sub。

默认值:100

cps.project String 必填。包含 Pub/Sub 主题。
cps.subscription String 必需。Pub/Sub 的名称 从哪个订阅拉取消息。
gcp.credentials.file.path String 可选。存储 Google Cloud 凭据的文件的路径 用于对 Pub/Sub Lite 进行身份验证。
gcp.credentials.json String 可选。一个包含 Google Cloud 的 JSON blob, Pub/Sub Lite 进行身份验证。
kafka.key.attribute String

用作键的 Pub/Sub 消息属性 这些消息会发布到 Kafka。如果设置为 "orderingKey",则使用 消息的排序键。如果为 null,则 Kafka 记录没有 键。

默认值:null

kafka.partition.count Integer

Kafka 主题中消息所在的 Kafka 分区的数量 应用。如果分区架构是 "kafka_partitioner".

默认值:1。

kafka.partition.scheme String

用于将消息分配给 Kafka 中的分区的方案。可以是 下列任一值:

  • round_robin:以轮循方式分配分区 。
  • hash_key:通过对记录进行哈希处理来查找分区 键。
  • hash_value:通过对记录进行哈希处理来查找分区 值。
  • kafka_partitioner:将分区逻辑委托给 Kafka Producer。默认情况下,Kafka 提供方会自动检测 并执行基于 murmur 哈希的分区 映射或轮循,具体取决于是否提供了记录键。
  • ordering_key:使用消息的哈希代码 排序键。如果没有排序键,则使用 round_robin.

默认值:round_robin

kafka.record.headers Boolean

如果为 true,则写入 Pub/Sub 消息特性 作为 Kafka 标头

kafka.topic String 必需。从以下来源接收消息的 Kafka 主题: Pub/Sub。

获取支持

如果您需要帮助,请创建支持服务工单。 对于一般性问题和讨论,请创建问题 (位于 GitHub 代码库中)。

后续步骤