本页面介绍了如何向精简版主题发布消息。您可以使用 Java 版 Pub/Sub Lite 客户端库发布消息。
在发布精简版主题的消息并创建精简版订阅后,您可以从精简版订阅中接收消息。
消息格式
消息由包含消息数据和元数据的字段组成。 在消息中指定以下任一项:
客户端库自动将消息分配给分区,Pub/Sub Lite 服务会向消息中添加以下字段:
- 分区中唯一的消息 ID
- Pub/Sub Lite 服务将消息存储在分区中的时间的时间戳
发布消息
要发布消息,请请求串流连接到精简版主题,然后通过串流连接发送消息。
以下示例显示了如何将消息发布到精简版主题:
gcloud
此命令需要使用 Python 3.6 或更高版本,并且需要安装 grpcio Python 软件包。对于 MacOS、Linux 和 Cloud Shell 用户,请运行以下命令:
sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1
如需发布消息,请使用 gcloud pubsub lite-topics publish 命令:
gcloud pubsub lite-topics publish TOPIC_ID \
--location=LITE_LOCATION \
--message=MESSAGE_DATA
请替换以下内容:
- TOPIC_ID:精简版主题的 ID
- LITE_LOCATION:精简版主题的位置
- MESSAGE_DATA:包含消息数据的字符串
Go
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Go 设置说明进行操作。
Java
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Java 设置说明进行操作。
Python
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Python 设置说明进行操作。
客户端库会异步发送消息并处理错误。如果发生错误,则客户端库会再次发送消息。
- Pub/Sub Lite 服务会关闭信息流。
- 客户端库会缓冲消息并重新建立到精简版主题的连接。
- 客户端库按顺序发送消息。
发布消息后,Pub/Sub Lite 服务会将消息存储在分区中,并将消息 ID 返回给发布者。
使用排序键
如果消息具有相同的排序键,则客户端库会将消息分配给同一分区。排序键必须最多为 1024 个字节的字符串。
排序键位于消息的 key
字段中。您可以使用客户端库设置排序键。
gcloud
此命令需要使用 Python 3.6 或更高版本,并且需要安装 grpcio Python 软件包。对于 MacOS、Linux 和 Cloud Shell 用户,请运行以下命令:
sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1
如需发布消息,请使用 gcloud pubsub lite-topics publish 命令:
gcloud pubsub lite-topics publish TOPIC_ID \
--location=LITE_LOCATION \
--ordering-key=ORDERING_KEY \
--message=MESSAGE_DATA
请替换以下内容:
- TOPIC_ID:精简版主题的 ID
- LITE_LOCATION:精简版主题的位置
- ORDERING_KEY:用于向分区分配消息的字符串
- MESSAGE_DATA:包含消息数据的字符串
Go
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Go 设置说明进行操作。
Java
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Java 设置说明进行操作。
Python
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Python 设置说明进行操作。
您可以使用排序键将多条消息发送到同一分区,以便订阅者按顺序接收消息。客户端库可能会为同一分区分配多个排序键。
设置事件时间
您可以使用事件时间来发布 Lite 消息。事件时间是自定义的 属性。
您可以使用客户端库或 gcloud CLI 设置事件时间戳。
此命令需要使用 Python 3.6 或更高版本,并且需要安装 grpcio Python 软件包。对于 MacOS、Linux 和 Cloud Shell 用户,请运行以下命令:
sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1
如需发布消息,请使用 gcloud pubsub lite-topics publish 命令:
gcloud pubsub lite-topics publish TOPIC_ID \
--location=LITE_LOCATION \
--event-time=EVENT_TIME \
--message=MESSAGE_DATA
请替换以下内容:
TOPIC_ID:精简版主题的 ID
LITE_LOCATION:精简版主题的位置
EVENT_TIME:用户指定的事件时间。更多信息 如需了解时间格式,请运行
gcloud topic datetimes
。MESSAGE_DATA:包含消息数据的字符串
使用属性
消息属性是包含消息相关元数据的键值对。属性可以是文本或字节字符串。
这些属性位于消息的 attributes
字段中。您可以使用客户端库设置属性。
gcloud
此命令需要使用 Python 3.6 或更高版本,并且需要安装 grpcio Python 软件包。对于 MacOS、Linux 和 Cloud Shell 用户,请运行以下命令:
sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1
如需发布消息,请使用 gcloud pubsub lite-topics publish 命令:
gcloud pubsub lite-topics publish TOPIC_ID \
--location=LITE_LOCATION \
--message=MESSAGE_DATA \
--attribute=KEY=VALUE,...
请替换以下内容:
Go
在运行此示例之前,请按照 Pub/Sub Lite 客户端库。
Java
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Java 设置说明进行操作。
Python
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Python 设置说明进行操作。
属性可以指示如何处理消息。订阅者可以解析消息的 attributes
字段,并根据其属性处理消息。
批处理消息
客户端库批量发布消息。较大的批次会占用较少的计算资源,但会增加延迟时间。您可以通过批处理设置更改批次大小。
下表列出了您可以配置的批处理设置:
设置 | 说明 | 默认 |
---|---|---|
请求大小 | 批次的大小上限(以字节为单位)。 | 3.5 MiB |
消息数量 | 一批次的最大消息数。 | 1000 封信息 |
发布延迟 | 向一批次添加消息与向精简版主题发送一批次消息之间的时间量(以毫秒为单位)。 | 50 毫秒 |
您可以使用客户端库配置批处理设置。
Go
在运行此示例之前,请按照 Pub/Sub Lite 客户端库。
Java
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Java 设置说明进行操作。
Python
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Python 设置说明进行操作。
发布者应用启动时,客户端库会为精简版主题中的每个分区创建一个批次。例如,如果精简版主题有两个分区,则发布者会创建两个批次并将每个批次发送到一个分区。
发布消息后,客户端库会对其进行缓冲,直到批量超出请求大小上限、消息数上限或发布延迟时间。
为消息排序
精简版主题会按照您发布消息的时间顺序对每个分区中的消息进行排序。要向同一分区分配消息,请使用排序键。
Pub/Sub Lite 按顺序传送来自分区的消息,订阅者可以按顺序处理消息。如需了解详情,请参阅接收消息。
发布幂等性
从以下版本开始,Pub/Sub Lite 客户端库支持幂等发布:
- java-pubsublite:版本 1.10.0。
- python-pubsublite:版本 1.8.0。
- google-cloud-go:pubsublite 版本 1.7.0。
如果由于网络或服务器错误而重新尝试发布消息, 只存储一次。幂等性只能在同一个会话内得到保证;它 无法保证通过新的发布者重新发布同一条消息 客户端。此操作不会产生任何额外的服务费用,也不会增加发布延迟时间。
启用或停用幂等发布
Pub/Sub Lite 客户端默认启用幂等发布 库。可使用 客户端库。
如果启用了幂等发布,则发布结果中返回的偏移量
可能是 -1
。如果消息被识别为已成功发布的消息的副本,但服务器在发布时没有足够的信息来返回消息的偏移量,则会返回此值。订阅者收到的消息始终具有有效的偏移量。
问题排查
收到重复的联系人
由于幂等性仅限于单个会话,因此如果您重新创建发布商客户端以发布相同的消息,可能会收到重复消息。
如果 Pub/Sub Lite 服务自动向订阅者分配分区(默认设置),订阅者客户端可能会多次接收同一消息。消息可能会重新传送给其他订阅者 在发生重新分配时通知客户。
发布商错误
发布方会话的状态是指 无活动。如果会话在此时间段之后恢复,发布方客户端会终止并显示类似于“Failed Precondition: Expected message to have publish sequence number of...”的错误消息,并且不会接受新消息。重新创建发布商客户端以解决此错误。