Questo documento descrive come scrivere dati di testo da Dataflow a Pub/Sub utilizzando il connettore I/O PubSubIO
di Apache Beam.
Panoramica
Per scrivere dati in Pub/Sub, utilizza il connettore PubSubIO
. Gli elementi di input possono essere messaggi Pub/Sub o solo i dati dei messaggi.
Se gli elementi di input sono messaggi Pub/Sub, puoi facoltativamente impostare attributi o una chiave di ordinamento per ogni messaggio.
Puoi utilizzare la versione Java, Python o Go del connettore PubSubIO
, come segue:
Java
Per scrivere in un singolo argomento, chiama il metodo PubsubIO.writeMessages
. Questo metodo prende una raccolta di input di PubsubMessage
oggetti. Il connettore definisce anche dei metodi pratici per la scrittura di stringhe, messaggi Avro con codifica binaria o messaggi protobuf con codifica binaria. Questi metodi convertono la raccolta di input
in messaggi Pub/Sub.
Per scrivere in un insieme dinamico di argomenti in base ai dati di input, chiama writeMessagesDynamic
. Specifica l'argomento di destinazione per ogni messaggio chiamando PubsubMessage.withTopic
sul messaggio. Ad esempio, puoi eseguire il routing dei messaggi a diversi argomenti in base al valore di un determinato campo nei dati di input.
Per saperne di più, consulta la documentazione di riferimento di PubsubIO
.
Python
Chiama il metodo pubsub.WriteToPubSub
.
Per impostazione predefinita, questo metodo prende una raccolta di input di tipo bytes
, che rappresenta il payload del messaggio. Se il parametro with_attributes
è
True
, il metodo richiede una raccolta di PubsubMessage
oggetti.
Per ulteriori informazioni, consulta la documentazione di riferimento del modulo pubsub
.
Go
Per scrivere dati in Pub/Sub, chiama il metodo pubsubio.Write
. Questo metodo accetta una raccolta di input di oggetti PubSubMessage
o di sezioni di byte contenenti i payload dei messaggi.
Per ulteriori informazioni, consulta la documentazione di riferimento del pacchetto pubsubio
.
Per ulteriori informazioni sui messaggi Pub/Sub, consulta Formato del messaggio nella documentazione di Pub/Sub.
Timestamp
Pub/Sub imposta un timestamp su ciascun messaggio. Questo timestamp rappresenta il momento in cui il messaggio viene pubblicato in Pub/Sub. In uno scenario di streaming, potrebbe interessarti anche il timestamp dell'evento, ovvero l'ora in cui sono stati generati i dati del messaggio. Puoi utilizzare il timestamp dell'elemento Apache Beam per rappresentare l'ora dell'evento. Le origini che creano un PCollection
illimitato spesso assegnano a ogni nuovo elemento un timestamp che corrisponde all'ora dell'evento.
Per Java e Python, il connettore I/O Pub/Sub può scrivere il timestamp di ogni elemento come attributo del messaggio Pub/Sub. I consumatori dei messaggi possono utilizzare questo attributo per ottenere il timestamp dell'evento.
Java
Richiama PubsubIO.Write<T>.withTimestampAttribute
e specifica il nome
dell'attributo.
Python
Specifica il parametro timestamp_attribute
quando chiami WriteToPubSub
.
Consegna messaggi
Dataflow supporta l'elaborazione "exactly-once" dei messaggi all'interno di una pipeline. Tuttavia, il connettore I/O Pub/Sub non può garantire la consegna "exactly-once" dei messaggi tramite Pub/Sub.
Per Java e Python, puoi configurare il connettore I/O Pub/Sub in modo che scriva l'ID univoco di ogni elemento come attributo del messaggio. I consumer dei messaggi possono quindi utilizzare questo attributo per deduplicare i messaggi.
Java
Richiama PubsubIO.Write<T>.withIdAttribute
e specifica il nome
dell'attributo.
Python
Specifica il parametro id_label
quando chiami WriteToPubSub
.
Esempi
L'esempio seguente crea un messaggio PCollection
di messaggi Pub/Sub e li scrive in un argomento Pub/Sub. L'argomento è specificato
come opzione della pipeline. Ogni messaggio contiene i dati del payload e un insieme di attributi.
Java
Per eseguire l'autenticazione in Dataflow, configura Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.
Python
Per eseguire l'autenticazione in Dataflow, configura Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.