将 Pub/Sub 连接到 Apache Kafka

使用集合让一切井井有条 根据您的偏好保存内容并对其进行分类。

本文档介绍如何使用 Pub/Sub 群组 Kafka 连接器集成 Apache Kafka 和 Pub/Sub。

关于 Pub/Sub 群组 Kafka 连接器

Apache Kafka 是一个用于流式传输事件的开源平台。此技术通常用在分布式架构中,可以用来松散耦合的组件之间进行通信。Pub/Sub 是一种用于异步发送和接收消息的代管式服务。与 Kafka 一样,您可以使用 Pub/Sub 在云架构中的组件之间进行通信。

借助 Pub/Sub Group Kafka 连接器,您可以集成这两个系统。以下连接器打包在连接器 JAR 中:

  • 接收器连接器会从一个或多个 Kafka 主题读取记录,并将其发布到 Pub/Sub。
  • 源连接器从 Pub/Sub 主题读取消息并将其发布到 Kafka。

在一些场景中,您可以使用 Pub/Sub 群组 Kafka 连接器:

  • 您正在将基于 Kafka 的架构迁移到 Google Cloud。
  • 您有一个前端系统,将事件存储在 Google Cloud 之外的 Kafka 中,但您也可以使用 Google Cloud 运行一些需要接收 Kafka 事件的后端服务。
  • 您可以从本地 Kafka 解决方案收集日志并将其发送到 Google Cloud 进行数据分析。
  • 您拥有使用 Google Cloud 的前端系统,但也使用 Kafka 在本地存储数据。

连接器需要 Kafka Connect,这是在 Kafka 与其他系统之间流式传输数据的框架。如需使用该连接器,您必须将 Kafka Connect 与 Kafka 集群一起运行。

本文档假定您熟悉 Kafka 和 Pub/Sub。在阅读本文档之前,建议您完成一个 Pub/Sub 快速入门

开始使用连接器

本部分将引导您完成以下任务:

  1. 配置 Pub/Sub 群组 Kafka 连接器。
  2. 将事件从 Kafka 发送到 Pub/Sub。
  3. 从 Pub/Sub 向 Kafka 发送消息。

前提条件

安装 Kafka

按照 Apache Kafka 快速入门在本地机器上安装单节点 Kafka。在快速入门中完成以下步骤:

  1. 下载最新的 Kafka 版本并进行提取。
  2. 启动 Kafka 环境。
  3. 创建 Kafka 主题。

身份验证

Pub/Sub 组 Kafka 连接器必须通过 Pub/Sub 进行身份验证,才能发送和接收 Pub/Sub 消息。如需设置身份验证,请执行以下步骤:

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装 Google Cloud CLI。
  3. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  4. 创建或选择 Google Cloud 项目。

    • 创建 Cloud 项目:

      gcloud projects create PROJECT_ID
    • 选择您创建的 Cloud 项目:

      gcloud config set project PROJECT_ID
  5. 为您的 Google 帐号创建身份验证凭据:

    gcloud auth application-default login
  6. 向您的 Google 帐号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • EMAIL_ADDRESS 替换为您的电子邮件地址。
    • ROLE 替换为每个角色。
  7. 安装 Google Cloud CLI。
  8. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  9. 创建或选择 Google Cloud 项目。

    • 创建 Cloud 项目:

      gcloud projects create PROJECT_ID
    • 选择您创建的 Cloud 项目:

      gcloud config set project PROJECT_ID
  10. 为您的 Google 帐号创建身份验证凭据:

    gcloud auth application-default login
  11. 向您的 Google 帐号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • EMAIL_ADDRESS 替换为您的电子邮件地址。
    • ROLE 替换为每个角色。

下载连接器 JAR

将连接器 JAR 文件下载到本地机器。如需了解详情,请参阅 GitHub 自述文件中的获取连接器

复制连接器配置文件

  1. 克隆或下载连接器的 GitHub 代码库

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. config 目录的内容复制到 Kafka 安装的 config 子目录中。

    cp config/* [path to Kafka installation]/config/
    

这些文件包含连接器的配置设置

更新 Kafka Connect 配置

  1. 导航到 Kafka 目录。
  2. 在文本编辑器中打开名为 config/connect-standalone.properties 的文件。
  3. 如果 plugin.path property 被注释掉,请取消备注。
  4. 更新 plugin.path property 以包含连接器 JAR 的路径。

    示例:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. offset.storage.file.filename 属性设置为本地文件名。在独立模式下,Kafka 使用此文件存储偏移数据。

    示例:

    offset.storage.file.filename=/tmp/connect.offsets
    

将事件从 Kafka 转发到 Pub/Sub

本部分介绍如何启动接收器连接器、将事件发布到 Kafka,然后从 Pub/Sub 读取转发的消息。

  1. 使用 Google Cloud CLI 创建包含订阅的 Pub/Sub 主题。

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    请替换以下内容:

    • PUBSUB_TOPIC:用于接收来自 Kafka 的消息的 Pub/Sub 主题的名称。
    • PUBSUB_SUBSCRIPTION:主题的 Pub/Sub 订阅的名称。
  2. 在文本编辑器中打开文件 /config/cps-sink-connector.properties。为以下属性添加值(在注释中标记为 "TODO"):

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC
    

    请替换以下内容:

    • KAFKA_TOPICS:要读取的 Kafka 主题的英文逗号分隔列表。
    • PROJECT_ID:包含您的 Pub/Sub 主题的 Google Cloud 项目。
    • PUBSUB_TOPIC:用于接收来自 Kafka 的消息的 Pub/Sub 主题。
  3. 在 Kafka 目录中,运行以下命令:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. 按照 Apache Kafka 快速入门中的步骤,将一些事件写入您的 Kafka 主题。

  5. 使用 gcloud CLI 从 Pub/Sub 读取事件。

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
    

将消息从 Pub/Sub 转发到 Kafka

本部分介绍如何启动源连接器、将消息发布到 Pub/Sub,以及从 Kafka 读取转发的消息。

  1. 使用 gcloud CLI 创建一个带有订阅的 Pub/Sub 主题。

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    请替换以下内容:

    • PUBSUB_TOPIC:Pub/Sub 主题的名称。
    • PUBSUB_SUBSCRIPTION:Pub/Sub 订阅的名称。
  2. 在文本编辑器中打开名为 /config/cps-source-connector.properties 的文件。为以下属性添加值(在注释中标记为 "TODO"):

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION
    

    请替换以下内容:

    • KAFKA_TOPIC:用于接收 Pub/Sub 消息的 Kafka 主题。
    • PROJECT_ID:包含您的 Pub/Sub 主题的 Google Cloud 项目。
    • PUBSUB_TOPIC:Pub/Sub 主题。
  3. 在 Kafka 目录中,运行以下命令:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. 使用 gcloud CLI 将消息发布到 Pub/Sub。

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    
  5. 从 Kafka 读取消息。按照 Apache Kafka 快速入门中的步骤从 Kafka 主题读取消息。

短信转化

Kafka 记录包含一个键和一个值,它们是可变长度的字节数组。或者,Kafka 记录还可以包含标头,即键值对。Pub/Sub 消息包含两个主要部分:消息正文和零个或零个以上的键值对特性。

Kafka Connect 使用转换器将键和值序列化到 Kafka 以及从 Kafka 序列化键和值。如需控制序列化,请在连接器配置文件中设置以下属性:

  • key.converter:用于序列化记录键的转换器。
  • value.converter:用于序列化记录值的转换器。

Pub/Sub 消息的正文是 ByteString 对象,因此最高效的转换是直接复制载荷。因此,我们建议您尽可能使用可生成原始数据类型(整数、浮点、字符串或字节架构)的转换器,以防止同一消息正文进行反序列化和反序列化。

从 Kafka 转换为 Pub/Sub

接收器连接器将 Kafka 记录转换为 Pub/Sub 消息,如下所示:

  • Kafka 记录键作为名为 "key" 的特性存储在 Pub/Sub 消息中。
  • 默认情况下,连接器会丢弃 Kafka 记录中的所有标头。但是,如果您将 headers.publish 配置选项设置为 true,则连接器会将标头写入为 Pub/Sub 特性。连接器会跳过超出 Pub/Sub 消息特性限制的所有标头。
  • 对于整数、浮点、字符串和字节的架构,连接器将 Kafka 记录值的字节直接传递到 Pub/Sub 消息正文。
  • 对于结构体架构,连接器将每个字段作为 Pub/Sub 消息的特性写入。例如,如果该字段为 { "id"=123 },则生成的 Pub/Sub 消息具有 "id"="123" 特性。该字段值始终会转换为字符串。
  • 对于地图架构,该连接器将每个键值对作为 Pub/Sub 消息的一个属性进行写入。例如,如果映射为 {"alice"=1,"bob"=2},则生成的 Pub/Sub 消息将具有 "alice"="1""bob"="2" 两个属性。键和值将转换为字符串。

结构架构和映射架构还有一些其他行为:

  • (可选)您可以通过设置 messageBodyName 配置属性,将特定结构体字段或映射键指定为消息正文。字段或键的值将作为 ByteString 存储在消息正文中。如果未设置 messageBodyName,则对于结构体和映射架构,消息正文为空。

  • 对于数组值,连接器仅支持原始数组类型。数组中的值序列会串联到单个 ByteString 对象中。

从 Pub/Sub 转换为 Kafka

来源连接器将 Pub/Sub 消息转换为 Kafka 记录,如下所示:

  • Kafka 记录键:默认情况下,键设置为 null。(可选)您可以通过设置 kafka.key.attribute 配置选项,指定要用作键的 Pub/Sub 消息特性。在这种情况下,连接器会查找具有该名称的属性,并将记录键设置为该属性值。如果指定的特性不存在,则记录键会设置为 null

  • Kafka 记录值。连接器按如下方式写入记录值:

    • 如果 Pub/Sub 消息没有自定义特性,则连接器使用 value.converter 指定的转换器将 Pub/Sub 消息正文直接写入 byte[] 类型的 Kafka 记录值。

    • 如果 Pub/Sub 消息具有自定义特性,并且 kafka.record.headersfalse,则连接器会将结构体写入记录值。结构体会为每个属性包含一个字段,以及一个名为 "message" 的字段,其值为 Pub/Sub 消息正文(以字节形式存储):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      在这种情况下,您必须使用与 struct 架构兼容的 value.converter,例如 org.apache.kafka.connect.json.JsonConverter

    • 如果 Pub/Sub 消息具有自定义特性,并且 kafka.record.headerstrue,则连接器会将特性作为 Kafka 记录标头写入。它使用 value.converter 指定的转换器将 Pub/Sub 消息正文直接作为 byte[] 类型写入 Kafka 记录值。

  • Kafka 记录标头。默认情况下,除非您将 kafka.record.headers 设置为 true,否则标头为空。

配置选项

除了 Kafka Connect API 提供的配置之外,Pub/Sub 群组 Kafka 连接器还支持以下配置。

接收器连接器配置选项

接收器连接器支持以下配置选项。

设置 数据类型 说明
connector.class String 必填。连接器的 Java 类。对于 Pub/Sub 接收器连接器,该值必须为 com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
cps.endpoint String

要使用的 Pub/Sub 端点。

默认值:"pubsub.googleapis.com:443"

cps.project String 必需。包含 Pub/Sub 主题的 Google Cloud。
cps.topic String 必需。要将 Kafka 记录发布到的 Pub/Sub 主题。
gcp.credentials.file.path String 可选。用于存储 Google Cloud 凭据以对 Pub/Sub Lite 进行身份验证的文件的路径。
gcp.credentials.json String 可选。一个 JSON blob,包含用于对 Pub/Sub Lite 进行身份验证的 Google Cloud。
headers.publish Boolean

如果为 true,请将任何 Kafka 记录标头添加为 Pub/Sub 消息特性。

默认值:false

maxBufferBytes Long

在将主题发布到 Pub/Sub 之前,主题 Kafka 分区上要接收的字节数上限。

默认值:10000000。

maxBufferSize Integer

在将消息发布到 Pub/Sub 之前,在 Kafka 主题分区上接收的记录数上限。

默认值:100。

maxDelayThresholdMs Integer

在将待处理记录发布到 Pub/Sub 之前,等待达到 maxBufferSizemaxBufferBytes 的最长时间(以毫秒为单位)。

默认值:100。

maxOutstandingMessages Long

在发布商禁止进一步发布之前,可以未完成的记录数上限(包括未完成的批次和待处理批量请求)。

默认值:Long.MAX_VALUE

maxOutstandingRequestBytes Long

在发布商禁止进一步发布之前,可以完成的总字节数上限(包括未完成的批次和待处理批次)。

默认值:Long.MAX_VALUE

maxRequestTimeoutMs Integer

发送至 Pub/Sub 的单个发布请求的超时(以毫秒为单位)。

默认值:10000。

maxTotalTimeoutMs Integer

发布到 Pub/Sub 的调用的总超时时间(以毫秒为单位),包括重试。

默认值:60000。

metadata.publish Boolean

如果为 true,请将 Kafka 主题、分区、偏移量和时间戳添加为 Pub/Sub 消息特性。

默认值:false

messageBodyName String

使用结构体或映射值架构时,指定要用作 Pub/Sub 消息正文的字段或键的名称。请参阅从 Kafka 转换为 Pub/Sub

默认值:"cps_message_body"

orderingKeySource String

指定如何在 Pub/Sub 消息中设置排序键。可以是下列值之一:

  • none:不设置排序键。
  • key:使用 Kafka 记录键作为排序键。
  • partition:将分区号(已转换为字符串)用作排序键。此设置仅适用于低吞吐量主题或具有数千个分区的主题。

默认值:none

topics String 必需。要读取的 Kafka 主题列表(以英文逗号分隔)。

来源连接器配置选项

来源连接器支持以下配置选项。

设置 数据类型 说明
connector.class String 必填。连接器的 Java 类。对于 Pub/Sub 源连接器,该值必须为 com.google.pubsub.kafka.source.CloudPubSubSourceConnector
cps.endpoint String

要使用的 Pub/Sub 端点。

默认值:"pubsub.googleapis.com:443"

cps.makeOrderingKeyAttribute Boolean

值为 true 时,使用与 Pub/Sub 消息属性相同的格式将排序键写入 Kafka 记录。请参阅从 Pub/Sub 转换为 Kafka 记录

默认值:false

cps.maxBatchSize Integer

每次向 Pub/Sub 拉取请求时要批量处理的消息数量上限。

默认值:100

cps.project String 必填。包含 Pub/Sub 主题的 Google Cloud 项目。
cps.subscription String 必需。要从中拉取消息的 Pub/Sub 订阅的名称。
gcp.credentials.file.path String 可选。用于存储 Google Cloud 凭据以对 Pub/Sub Lite 进行身份验证的文件的路径。
gcp.credentials.json String 可选。一个 JSON blob,包含用于对 Pub/Sub Lite 进行身份验证的 Google Cloud。
kafka.key.attribute String

用作发布到 Kafka 的消息的 Pub/Sub 消息特性。如果设置为 "orderingKey",请使用消息的排序键。如果为 null,则 Kafka 记录没有密钥。

默认值:null

kafka.partition.count Integer

用于发布消息的 Kafka 主题的 Kafka 分区数量。如果分区架构为 "kafka_partitioner",则系统会忽略此参数。

默认值:1。

kafka.partition.scheme String

用于向 Kafka 中的分区分配消息的方案。可以是以下值之一:

  • round_robin:以轮循方式分配分区。
  • hash_key:通过对记录键进行哈希处理来查找分区。
  • hash_value:通过对记录值进行哈希处理来查找分区。
  • kafka_partitioner:将分区逻辑委托给 Kafka 提供方。默认情况下,Kafka 提供方会自动检测分区数量并执行基于哈希哈希的分区映射或轮循,具体取决于是否提供了记录键。
  • ordering_key:使用消息的排序键的哈希代码。如果没有排序键,请使用 round_robin

默认值:round_robin

kafka.record.headers Boolean

如果为 true,请将 Pub/Sub 消息特性写入 Kafka 标头。

kafka.topic String 必需。从 Pub/Sub 接收消息的 Kafka 主题。

后续步骤