本文档介绍了如何使用 Apache Beam PubSubIO
I/O 连接器将文本数据从 Dataflow 写入 Pub/Sub。
概览
如需将数据写入 Pub/Sub,请使用 PubSubIO
连接器。输入元素可以是 Pub/Sub 消息,也可以只是消息数据。如果输入元素是 Pub/Sub 消息,则可以选择在每条消息上设置属性或排序键。
您可以使用 Java、Python 或 Go 版本的 PubSubIO
连接器,如下所示:
Java
如需写入单个主题,请调用 PubsubIO.writeMessages
方法。此方法接受 PubsubMessage
对象的输入集合。连接器还定义了用于写入字符串、二进制编码的 Avro 消息或二进制编码的 protobuf 消息的便捷方法。这些方法将输入集合转换为 Pub/Sub 消息。
要根据输入数据向一组动态主题写入数据,请调用 writeMessagesDynamic
。通过对消息调用 PubsubMessage.withTopic
来为每条消息指定目标主题。例如,您可以根据输入数据中特定字段的值将消息路由到不同的主题。
如需了解详情,请参阅 PubsubIO
参考文档。
Python
调用 pubsub.WriteToPubSub
方法。
默认情况下,此方法接受 bytes
类型的输入集合,该集合表示消息载荷。如果 with_attributes
参数为 True
,则该方法接受 PubsubMessage
对象的集合。
如需了解详情,请参阅 pubsub
模块参考文档。
Go
如需将数据写入 Pub/Sub,请调用 pubsubio.Write
方法。此方法接受 PubSubMessage
对象或包含消息载荷的字节切片的输入集合。
如需了解详情,请参阅 pubsubio
软件包参考文档。
如需详细了解 Pub/Sub 消息,请参阅 Pub/Sub 文档中的消息格式。
。时间戳
Pub/Sub 会为每条消息设置时间戳。此时间戳表示消息发布到 Pub/Sub 的时间。在流处理场景中,您可能还需要关注事件时间戳,即生成消息数据的时间。您可以使用 Apache Beam 元素时间戳来表示事件时间。创建无界限 PCollection
的源通常会为每个新元素分配一个与事件时间对应的时间戳。
对于 Java 和 Python,Pub/Sub I/O 连接器可以将每个元素的时间戳写入为 Pub/Sub 消息属性。消息使用者可以使用此属性来获取事件时间戳。
Java
调用 PubsubIO.Write<T>.withTimestampAttribute
并指定属性的名称。
Python
在调用 WriteToPubSub
时指定 timestamp_attribute
参数。
邮件递送
Dataflow 支持对一个流水线中的消息进行一次性处理。但是,Pub/Sub I/O 连接器无法保证通过 Pub/Sub 一次性递送消息。
对于 Java 和 Python,您可以配置 Pub/Sub I/O 连接器,以将每个元素的唯一 ID 写入为消息属性。然后,消息使用方可以使用此属性来删除重复的消息。
Java
调用 PubsubIO.Write<T>.withIdAttribute
并指定属性的名称。
Python
在调用 WriteToPubSub
时指定 id_label
参数。
示例
以下示例创建由 Pub/Sub 消息组成的 PCollection
,并将其写入 Pub/Sub 主题。该主题被指定为流水线选项。每条消息都包含载荷数据和一组属性。
Java
如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
Python
如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。