将 Pub/Sub Lite 连接到 Apache Kafka

本文档介绍了如何集成 Apache Kafka 和 Pub/Sub Lite,具体方法为: 使用 Pub/Sub Group Kafka Connector

Pub/Sub Group Kafka Connector 简介

Apache Kafka 是一个用于流式处理事件的开源平台。它通常用于分布式架构,可在松散耦合的各组件之间实现通信。Pub/Sub Lite 是一种代管式服务, 异步接收消息与 Kafka 一样,您可以使用 Pub/Sub Lite 在云架构中的各个组件之间进行通信。

借助 Pub/Sub Group Kafka Connector,您可以集成这两个系统。 以下连接器打包在 Connector JAR 中:

  • 接收器连接器可从一个或多个 Kafka 主题读取记录,并将其发布到 Pub/Sub Lite。
  • 来源连接器从 Pub/Sub 精简版主题中读取消息 并发布到 Kafka

以下是您可能需要使用 Pub/Sub Group Kafka Connector 的一些场景:

  • 您正在将基于 Kafka 的架构迁移到 Google Cloud。
  • 您有一个前端系统,用于将事件存储在 Kafka 之外的 还可使用 Google Cloud 来运行一些后端。 服务,这些服务需要接收 Kafka 事件。
  • 您从本地 Kafka 解决方案收集日志,并将其发送到 Google Cloud 进行数据分析。
  • 您有一个使用 Google Cloud 的前端系统,但您还需要存储数据 使用 Kafka

该连接器需要 Kafka Connect,它是一个用于在 Kafka 和其他系统之间流式传输数据的框架。如需使用该连接器,您必须将 Kafka Connect 与 Kafka 集群一起运行。

本文档假定您熟悉 Kafka 和 Pub/Sub Lite。如需开始使用 Pub/Sub 精简版,请参阅使用 Google Cloud 控制台在 Pub/Sub 精简版中发布和接收消息

Pub/Sub Group Kafka Connector 使用入门

本部分将引导您完成以下任务:

  1. 配置 Pub/Sub Group Kafka Connector。
  2. 将事件从 Kafka 发送到 Pub/Sub 精简版。
  3. 将消息从 Pub/Sub Lite 发送到 Kafka。

前提条件

安装 Kafka

按照 Apache Kafka 快速入门 在本地机器上安装单节点 Kafka。在快速入门中完成以下步骤:

  1. 下载最新的 Kafka 版本并将其解压缩。
  2. 启动 Kafka 环境。
  3. 创建 Kafka 主题。

身份验证

Pub/Sub Group Kafka Connector 必须通过 Pub/Sub 进行身份验证,才能发送和接收 Pub/Sub 消息。如需设置身份验证,请执行以下操作: 请执行以下步骤:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Create local authentication credentials for your user account:

    gcloud auth application-default login
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Create local authentication credentials for your user account:

    gcloud auth application-default login
  11. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

下载连接器 JAR

将连接器 JAR 文件下载到本地机器。如需了解详情,请参阅 获取连接器

复制连接器配置文件

  1. 克隆或下载相应连接器的 GitHub 代码库

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. config 目录的内容复制到config 您的 Kafka 安装。

    cp config/* [path to Kafka installation]/config/
    

这些文件包含连接器的配置设置

更新 Kafka Connect 配置

  1. 前往包含您下载的 Kafka Connect 二进制文件的目录。
  2. 在 Kafka Connect 二进制目录中,打开名为 文本编辑器中支持 config/connect-standalone.properties
  3. 如果 plugin.path property 被注释掉,请取消注释。
  4. 更新 plugin.path property 以包含连接器 JAR 的路径。

    示例:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. offset.storage.file.filename 属性设置为本地文件名。在独立模式下,Kafka 使用此文件存储偏移数据。

    示例:

    offset.storage.file.filename=/tmp/connect.offsets
    

将事件从 Kafka 转发到 Pub/Sub Lite

本部分介绍如何启动接收器连接器、将事件发布到 Kafka、 然后从 Pub/Sub Lite 读取转发的消息。

  1. 使用 Google Cloud CLI 创建 Pub/Sub 精简版预留。

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4

    替换以下内容:

    • RESERVATION_NAME:Pub/Sub Lite 的名称 预留。
    • LOCATION:预留的位置
  2. 使用 Google Cloud CLI 创建包含订阅的 Pub/Sub Lite 主题。

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC

    替换以下内容:

    • LITE_TOPIC:用于接收来自 Kafka 的消息的 Pub/Sub Lite 主题的名称。
    • LOCATION:主题的位置。该值必须与 预留的位置
    • RESERVATION_NAME:Pub/Sub Lite 预订的名称。
    • LITE_SUBSCRIPTION:该主题的 Pub/Sub Lite 订阅的名称。
  3. 在文本编辑器中打开 /config/pubsub-lite-sink-connector.properties 文件。为以下在注释中标记为 "TODO" 的属性添加值:

    topics=KAFKA_TOPICS
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.topic=LITE_TOPIC

    替换以下内容:

    • KAFKA_TOPICS:要读取的 Kafka 主题的逗号分隔列表 。
    • PROJECT_ID:包含您的 Pub/Sub 精简版主题。
    • LOCATION:Pub/Sub 精简版主题的位置。
    • LITE_TOPIC:用于接收来自 Kafka 的消息的 Pub/Sub Lite 主题。
  4. 在 Kafka 目录中,运行以下命令:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-sink-connector.properties
    
  5. 按照 Apache Kafka 快速入门中的步骤,将一些事件写入 Kafka 主题。

  6. 使用从精简版订阅接收消息中所述的任一方法订阅 Pub/Sub Lite 订阅。

将消息从 Pub/Sub Lite 转发到 Kafka

本部分介绍如何启动源连接器,以及如何将消息发布到 Pub/Sub Lite,并从 Kafka 读取转发的消息。

  1. 使用 Google Cloud CLI 创建 Pub/Sub 精简版预留。

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4

    替换以下内容:

    • RESERVATION_NAME:Pub/Sub Lite 预订的名称。
    • LOCATION:预留的位置
  2. 使用 Google Cloud CLI 创建包含订阅的 Pub/Sub Lite 主题。

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC

    替换以下内容:

    • LITE_TOPIC:Pub/Sub 精简版主题的名称。
    • LOCATION:主题的位置。该值必须与 预留的位置
    • RESERVATION_NAME:Pub/Sub Lite 预订的名称。
    • LITE_SUBSCRIPTION:该主题的 Pub/Sub Lite 订阅的名称。
  3. 在以下位置打开名为 /config/pubsub-lite-source-connector.properties 的文件: 文本编辑器。为以下属性添加值,这些属性值会被标记为 "TODO"

    topic=KAFKA_TOPIC
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.subscription=LITE_SUBSCRIPTION

    替换以下内容:

    • KAFKA_TOPIC:用于接收 Pub/Sub 消息的 Kafka 主题。
    • PROJECT_ID:包含您的 Pub/Sub 主题的 Google Cloud 项目。
    • LOCATION:Pub/Sub 精简版主题的位置。
    • LITE_SUBSCRIPTION:Pub/Sub Lite 主题。
  4. 从 Kafka 目录运行以下命令:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-source-connector.properties
    
  5. 使用向精简版主题发布消息中所述的任一方法将消息发布到 Pub/Sub 精简版主题。

  6. 从 Kafka 读取消息。请按照 Apache Kafka 快速入门 从 Kafka 主题读取消息。

短信转化

Kafka 记录包含一个键和一个值,它们是长度可变的字节数组。(可选) Kafka 记录还可以具有标头,标头是键值对。Pub/Sub 精简版消息包含以下字段:

  • key:消息键 (bytes)
  • data:消息数据 (bytes)
  • attributes:零个或多个属性。每个属性都是一个 (key,values[]) 映射。一个属性可以有多个值。
  • event_time:可选的用户提供的事件时间戳。

Kafka Connect 使用转换器将键和值序列化到 Kafka 和从 Kafka 序列化。如需控制序列化,请在连接器配置文件中设置以下属性:

  • key.converter:用于序列化记录键的转换器。
  • value.converter:用于对记录值进行序列化的转换器。

从 Kafka 转换为 Pub/Sub Lite

接收器连接器将 Kafka 记录转换为 Pub/Sub Lite 消息 ,如下所示。

Kafka 记录 (SinkRecord) Pub/Sub Lite 消息
key
data
标头 attributes
时间戳 eventTime
时间戳类型 attributes["x-goog-pubsublite-source-kafka-event-time-type"]
主题 attributes["x-goog-pubsublite-source-kafka-topic"]
分区 attributes["x-goog-pubsublite-source-kafka-offset"]
偏移值 attributes["x-goog-pubsublite-source-kafka-partition"]

键、值和标头按如下方式编码:

  • null 架构被视为字符串架构。
  • 字节载荷会直接写入,无需转换。
  • 字符串、整数和浮点载荷会编码为 UTF-8 字节序列。
  • 所有其他载荷都会编码到协议缓冲区中 Value 然后转换为字节字符串。
    • 嵌套字符串字段会编码为 protobuf Value
    • 嵌套字节字段编码为一个 protobuf Value,用于保存 base64 编码的字节。
    • 嵌套的数字字段会编码为双精度值,并存储在 protobuf Value 中。
    • 不支持使用数组、映射或结构体键的映射。

从 Pub/Sub Lite 到 Kafka 的转换

源连接器会将 Pub/Sub Lite 消息转换为 Kafka 记录,如下所示:

Pub/Sub Lite 消息 Kafka 记录 (SourceRecord)
key
data
attributes 标头
event_time 时间戳。如果 event_time 不存在,则发布 所用的时间。

配置选项

除了 Kafka Connect API 提供的配置之外,该连接器还支持以下 Pub/Sub Lite 配置。

Sink 连接器配置选项

Sink 连接器支持以下配置选项。

设置 数据类型 说明
connector.class String 必填。连接器的 Java 类。对于 Pub/Sub Lite 接收器连接器,该值必须为 com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector
gcp.credentials.file.path String 可选。存储 Google Cloud 凭据的文件的路径 用于对 Pub/Sub Lite 进行身份验证。
gcp.credentials.json String 可选。一个 JSON 数据块,用于包含用于对 Pub/Sub 精简版进行身份验证的 Google Cloud 凭据。
pubsublite.location String 必需。Pub/Sub Lite 主题的位置。
pubsublite.project String 必需。包含 Pub/Sub 精简版主题。
pubsublite.topic String 必需。要发布的 Pub/Sub Lite 主题 Kafka 记录至。
topics String 必需。以英文逗号分隔的 Kafka 主题列表, 读取内容。

来源连接器配置选项

来源连接器支持以下配置选项。

设置 数据类型 说明
connector.class String 必填。连接器的 Java 类。对于 Pub/Sub Lite 源连接器,该值必须为 com.google.pubsublite.kafka.source.PubSubLiteSourceConnector
gcp.credentials.file.path String 可选。存储 Google Cloud 凭据的文件的路径 用于对 Pub/Sub Lite 进行身份验证。
gcp.credentials.json String 可选。一个 JSON 数据块,用于包含用于对 Pub/Sub 精简版进行身份验证的 Google Cloud 凭据。
kafka.topic String 必需。从 Pub/Sub Lite 接收消息的 Kafka 主题。
pubsublite.location String 必需。Pub/Sub Lite 主题的位置。
pubsublite.partition_flow_control.bytes Long

每个 Pub/Sub Lite 分区的最大未完成字节数。

默认值:20000000

pubsublite.partition_flow_control.messages Long

每个 Pub/Sub Lite 分区的未完成消息数量上限。

默认值:Long.MAX_VALUE

pubsublite.project String 必填。包含 Pub/Sub Lite 主题的 Google Cloud 项目。
pubsublite.subscription String 必需。Pub/Sub Lite 的名称 从哪个订阅拉取消息。

后续步骤