En este documento, se describe cómo escribir datos de texto de Dataflow a Pub/Sub mediante el conector de E/S PubSubIO
de Apache Beam.
Descripción general
Para escribir datos en Pub/Sub, usa el conector PubSubIO
. Los elementos de entrada pueden ser mensajes de Pub/Sub o solo los datos del mensaje.
Si los elementos de entrada son mensajes de Pub/Sub, puedes establecer atributos o una clave de ordenamiento en cada mensaje de forma opcional.
Puedes usar la versión de Java, Python o Go del conector PubSubIO
de la siguiente manera:
Java
Para escribir en un solo tema, llama al método PubsubIO.writeMessages
. Este método toma una colección de entrada de objetos PubsubMessage
. El conector también define métodos convenientes para escribir cadenas, mensajes Avro codificados en formato binario o mensajes protobuf codificados en formato binario. Estos métodos convierten la colección de entradas en mensajes de Pub/Sub.
Para escribir en un conjunto dinámico de temas según los datos de entrada, llama a writeMessagesDynamic
. Especifica el tema de destino de cada mensaje mediante una llamada a PubsubMessage.withTopic
en el mensaje. Por ejemplo, puedes enrutar mensajes a diferentes temas según el valor de un campo particular en tus datos de entrada.
Para obtener más información, consulta la documentación de referencia PubsubIO
.
Python
Llama al método pubsub.WriteToPubSub
.
De forma predeterminada, este método toma una colección de entrada de tipo bytes
que representa la carga útil del mensaje. Si el parámetro with_attributes
es True
, el método toma una colección de objetos PubsubMessage
.
Para obtener más información, consulta la documentación de referencia del módulo pubsub
.
Go
Para escribir datos en Pub/Sub, llama al método pubsubio.Write
. Este método toma una colección de entrada de objetos PubSubMessage
o porciones de bytes que contienen las cargas útiles del mensaje.
Para obtener más información, consulta la documentación de referencia paquete pubsubio
.
Para obtener más información sobre los mensajes de Pub/Sub, consulta Formato del mensaje en la documentación de Pub/Sub.
Marcas de tiempo
Pub/Sub establece una marca de tiempo en cada mensaje. Esta marca de tiempo representa la hora en la que se publica el mensaje en Pub/Sub. En una situación de transmisión, también puede importar la marca de tiempo del evento, que es la hora en que se generaron los datos del mensaje. Puedes usar la marca de tiempo del elemento de Apache Beam para representar la hora del evento. Las fuentes que crean una PCollection
no delimitada a menudo le asignan una marca de tiempo a cada elemento nuevo que corresponde a la hora del evento.
Para Java y Python, el conector de E/S de Pub/Sub puede escribir la marca de tiempo de cada elemento como un atributo de mensaje de Pub/Sub. Los consumidores de mensajes pueden usar este atributo para obtener la marca de tiempo del evento.
Java
Llama a PubsubIO.Write<T>.withTimestampAttribute
y especifica el nombre del atributo.
Python
Especifica el parámetro timestamp_attribute
cuando llames a WriteToPubSub
.
Entrega de mensajes
Dataflow admite el procesamiento de mensajes exactamente una vez de mensajes dentro de una canalización. Sin embargo, el conector de E/S de Pub/Sub no puede garantizar la entrega de mensajes exactamente una vez a través de Pub/Sub.
Para Java y Python, puedes configurar el conector de E/S de Pub/Sub a fin de escribir el ID único de cada elemento como un atributo de mensaje. Luego, los consumidores de mensajes pueden usar este atributo para anular la duplicación de mensajes.
Java
Llama a PubsubIO.Write<T>.withIdAttribute
y especifica el nombre del atributo.
Python
Especifica el parámetro id_label
cuando llames a WriteToPubSub
.
Ejemplos
En el siguiente ejemplo, se crea una PCollection
de mensajes de Pub/Sub y los escribe en un tema de Pub/Sub. El tema se especifica como una opción de canalización. Cada mensaje contiene datos de carga útil y un conjunto de atributos.
Java
Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
Python
Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.