将 Pub/Sub 连接到 Apache Kafka

本文档介绍了如何使用 Pub/Sub Group Kafka Connector 集成 Apache Kafka 和 Pub/Sub。

Pub/Sub Group Kafka Connector 简介

Apache Kafka 是一个用于流式传输事件的开源平台。它通常用于分布式架构,可在松散耦合的各组件之间实现通信。Pub/Sub 是一种用于异步发送和接收消息的代管式服务。与 Kafka 一样,您可以使用 Pub/Sub 在云架构中的各个组件之间进行通信。

借助 Pub/Sub Group Kafka Connector,您可以集成这两个系统。以下连接器打包在 Connector JAR 中:

  • 接收器连接器会从一个或多个 Kafka 主题读取记录,并将其发布到 Pub/Sub。
  • 源连接器会从 Pub/Sub 主题读取消息,并将其发布到 Kafka。

以下是您可能需要使用 Pub/Sub Group Kafka Connector 的一些场景:

  • 您要将基于 Kafka 的架构迁移到 Google Cloud。
  • 您有一个前端系统,该系统会将事件存储在 Google Cloud 之外的 Kafka 中,但您还使用 Google Cloud 运行一些后端服务,这些服务需要接收 Kafka 事件。
  • 您从本地 Kafka 解决方案收集日志,并将其发送到 Google Cloud 进行数据分析。
  • 您有一个使用 Google Cloud 的前端系统,但您还使用 Kafka 在本地存储数据。

该连接器需要 Kafka Connect,它是一个用于在 Kafka 和其他系统之间流式传输数据的框架。如需使用该连接器,您必须将 Kafka Connect 与 Kafka 集群一起运行。

本文档假定您熟悉 Kafka 和 Pub/Sub。在阅读本文档之前,建议您先完成某个 Pub/Sub 快速入门

Pub/Sub 连接器不支持 Google Cloud IAM 和 Kafka Connect ACL 之间的任何集成。

连接器使用入门

本部分将引导您完成以下任务:

  1. 配置 Pub/Sub Group Kafka Connector。
  2. 将事件从 Kafka 发送到 Pub/Sub。
  3. 将消息从 Pub/Sub 发送到 Kafka。

前提条件

安装 Kafka

按照 Apache Kafka 快速入门中的说明在本地机器上安装单节点 Kafka。请完成快速入门中的以下步骤:

  1. 下载最新的 Kafka 版本并将其解压缩。
  2. 启动 Kafka 环境。
  3. 创建 Kafka 主题。

身份验证

Pub/Sub Group Kafka Connector 必须通过 Pub/Sub 进行身份验证,才能发送和接收 Pub/Sub 消息。如需设置身份验证,请执行以下步骤:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Create local authentication credentials for your user account:

    gcloud auth application-default login
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Create local authentication credentials for your user account:

    gcloud auth application-default login
  11. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

下载连接器 JAR

将连接器 JAR 文件下载到本地机器。如需了解详情,请参阅 GitHub 自述文件中的获取连接器部分。

复制连接器配置文件

  1. 克隆或下载相应连接器的 GitHub 代码库

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. config 目录的内容复制到 Kafka 安装目录的 config 子目录中。

    cp config/* [path to Kafka installation]/config/
    

这些文件包含连接器的配置设置

更新 Kafka Connect 配置

  1. 前往包含您下载的 Kafka Connect 二进制文件的目录。
  2. 在 Kafka Connect 二进制目录中,使用文本编辑器打开名为 config/connect-standalone.properties 的文件。
  3. 如果 plugin.path property 被注释掉,请取消注释。
  4. 更新 plugin.path property,以包含连接器 JAR 的路径。

    示例:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. offset.storage.file.filename 属性设置为本地文件名。在独立模式下,Kafka 使用此文件存储偏移量数据。

    示例:

    offset.storage.file.filename=/tmp/connect.offsets
    

将事件从 Kafka 转发到 Pub/Sub

本部分介绍了如何启动接收器连接器、将事件发布到 Kafka,然后从 Pub/Sub 读取转发的消息。

  1. 使用 Google Cloud CLI 创建包含订阅的 Pub/Sub 主题。

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    替换以下内容:

    • PUBSUB_TOPIC:用于接收来自 Kafka 的消息的 Pub/Sub 主题的名称。
    • PUBSUB_SUBSCRIPTION:该主题的 Pub/Sub 订阅的名称。
  2. 在文本编辑器中打开 /config/cps-sink-connector.properties 文件。为以下在注释中标记为 "TODO" 的属性添加值:

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC

    替换以下内容:

    • KAFKA_TOPICS:要从中读取的 Kafka 主题的英文逗号分隔列表。
    • PROJECT_ID:包含您的 Pub/Sub 主题的 Google Cloud 项目。
    • PUBSUB_TOPIC:用于接收来自 Kafka 的消息的 Pub/Sub 主题。
  3. 在 Kafka 目录中,运行以下命令:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. 按照 Apache Kafka 快速入门中的步骤,将一些事件写入 Kafka 主题。

  5. 使用 gcloud CLI 从 Pub/Sub 读取事件。

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack

将消息从 Pub/Sub 转发到 Kafka

本部分介绍了如何启动源连接器、向 Pub/Sub 发布消息以及从 Kafka 读取转发的消息。

  1. 使用 gcloud CLI 创建一个具有订阅的 Pub/Sub 主题。

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    替换以下内容:

    • PUBSUB_TOPIC:Pub/Sub 主题的名称。
    • PUBSUB_SUBSCRIPTION:Pub/Sub 订阅的名称。
  2. 在文本编辑器中打开名为 /config/cps-source-connector.properties 的文件。为以下在注释中标记为 "TODO" 的属性添加值:

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION

    替换以下内容:

    • KAFKA_TOPIC:用于接收 Pub/Sub 消息的 Kafka 主题。
    • PROJECT_ID:包含您的 Pub/Sub 主题的 Google Cloud 项目。
    • PUBSUB_TOPIC:Pub/Sub 主题。
  3. 在 Kafka 目录中,运行以下命令:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. 使用 gcloud CLI 将消息发布到 Pub/Sub。

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
  5. 从 Kafka 读取消息。按照 Apache Kafka 快速入门中的步骤读取 Kafka 主题中的消息。

短信转化

Kafka 记录包含一个键和一个值,它们是长度可变的字节数组。Kafka 记录还可以包含标头(即键值对),这不是必需的。Pub/Sub 消息包含两个主要部分:消息正文和零个或多个键值对属性。

Kafka Connect 使用转换器将键和值序列化到 Kafka 和从 Kafka 序列化。如需控制序列化,请在连接器配置文件中设置以下属性:

  • key.converter:用于序列化记录键的转换器。
  • value.converter:用于序列化记录值的转换器。

Pub/Sub 消息的正文是 ByteString 对象,因此最有效的转换是直接复制载荷。因此,我们建议尽可能使用生成基元数据类型(整数、浮点数、字符串或字节架构)的转换器,以防止对同一消息正文进行反序列化和重新序列化。

从 Kafka 转换为 Pub/Sub

接收器连接器会将 Kafka 记录转换为 Pub/Sub 消息,如下所示:

  • Kafka 记录键会以名为 "key" 的属性存储在 Pub/Sub 消息中。
  • 默认情况下,连接器会丢弃 Kafka 记录中的所有标头。不过,如果您将 headers.publish 配置选项设置为 true,连接器会将标头写入为 Pub/Sub 属性。连接器会跳过超出 Pub/Sub 消息属性限制的所有标头。
  • 对于整数、浮点数、字符串和字节架构,连接器会将 Kafka 记录值的字节直接传递到 Pub/Sub 消息正文。
  • 对于结构体架构,连接器会将每个字段写入为 Pub/Sub 消息的属性。例如,如果字段为 { "id"=123 },则生成的 Pub/Sub 消息将具有属性 "id"="123"。字段值始终会转换为字符串。不支持将映射和结构体类型用作结构体中的字段类型。
  • 对于映射架构,连接器会将每个键值对写入为 Pub/Sub 消息的属性。例如,如果映射为 {"alice"=1,"bob"=2},则生成的 Pub/Sub 消息具有两个属性:"alice"="1""bob"="2"。键和值会转换为字符串。

结构体和映射架构有一些额外的行为:

  • (可选)您可以通过设置 messageBodyName 配置属性,指定特定结构体字段或映射键作为消息正文。字段或键的值会以 ByteString 的形式存储在邮件正文中。如果您未设置 messageBodyName,则结构体和映射架构的消息正文为空。

  • 对于数组值,连接器仅支持基元数组类型。数组中的值序列会串联成单个 ByteString 对象。

从 Pub/Sub 转换为 Kafka

源连接器会将 Pub/Sub 消息转换为 Kafka 记录,如下所示:

  • Kafka 记录键:默认情况下,该键设置为 null。您还可以选择通过设置 kafka.key.attribute 配置选项来指定要用作键的 Pub/Sub 消息属性。在这种情况下,连接器会查找具有该名称的属性,并将记录键设置为属性值。如果指定的属性不存在,则将记录键设置为 null

  • Kafka 记录值。连接器会按如下方式写入记录值:

    • 如果 Pub/Sub 消息没有自定义属性,连接器会使用 value.converter 指定的转换器,将 Pub/Sub 消息正文直接作为 byte[] 类型写入 Kafka 记录值。

    • 如果 Pub/Sub 消息具有自定义属性且 kafka.record.headersfalse,则连接器会将结构体写入记录值。该结构体包含每个属性对应的一个字段,以及一个名为 "message" 的字段,其值为 Pub/Sub 消息正文(存储为字节):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      在这种情况下,您必须使用与 struct 架构兼容的 value.converter,例如 org.apache.kafka.connect.json.JsonConverter

    • 如果 Pub/Sub 消息具有自定义属性且 kafka.record.headerstrue,则连接器会将这些属性写入为 Kafka 记录标头。它使用 value.converter 指定的转换器,将 Pub/Sub 消息正文直接作为 byte[] 类型写入 Kafka 记录值。

  • Kafka 记录标头。默认情况下,标头为空,除非您将 kafka.record.headers 设置为 true

配置选项

除了 Kafka Connect API 提供的配置之外,Pub/Sub Group Kafka Connector 还支持接收器和源配置,如 Pub/Sub 连接器配置中所述。

获取支持

如果您需要帮助,请创建支持服务工单。对于常规问题和讨论,请在 GitHub 代码库中创建问题。

后续步骤