Ce document explique comment écrire des données textuelles depuis Dataflow vers Cloud Storage à l'aide du Connecteur d'E/S PubSubIO
d'Apache Beam.
Présentation
Pour écrire des données dans Pub/Sub, utilisez le connecteur PubSubIO
. Les éléments d'entrée peuvent être des messages Pub/Sub ou simplement les données du message.
Si les éléments d'entrée sont des messages Pub/Sub, vous pouvez éventuellement définir des attributs ou une clé de tri pour chaque message.
Vous pouvez utiliser la version Java, Python ou Go du connecteur PubSubIO
, comme suit :
Java
Pour écrire dans un seul sujet, appelez la méthode PubsubIO.writeMessages
. Cette méthode utilise une collection d'entrée d'objets PubsubMessage
. Le connecteur définit également des méthodes pratiques pour écrire des chaînes, des messages Avro encodés en binaire ou des messages protobuf encodés en binaire. Ces méthodes convertissent la collection d'entrée en messages Pub/Sub.
Pour écrire dans un ensemble dynamique de sujets basé sur les données d'entrée, appelez writeMessagesDynamic
. Spécifiez le sujet de destination de chaque message en appelant PubsubMessage.withTopic
sur le message. Par exemple, vous pouvez acheminer des messages vers différents sujets en fonction de la valeur d'un champ particulier dans vos données d'entrée.
Pour plus d'informations, consultez la documentation de référence de PubsubIO
.
Python
Appelez la méthode pubsub.WriteToPubSub
.
Par défaut, cette méthode utilise une collection d'entrée de type bytes
, qui représente la charge utile du message. Si le paramètre with_attributes
est True
, la méthode utilise une collection d'objets PubsubMessage
.
Pour en savoir plus, consultez la documentation de référence du module pubsub
.
Go
Pour écrire des données dans Pub/Sub, appelez la méthode pubsubio.Write
. Cette méthode prend une collection d'entrée d'objets PubSubMessage
ou de tranches d'octets contenant les charges utiles du message.
Pour plus d'informations, consultez la documentation de référence du package pubsubio
.
Pour en savoir plus sur les messages Pub/Sub, consultez la section Format de message dans la documentation Pub/Sub.
Horodatages
Pub/Sub définit un horodatage pour chaque message. Cet horodatage représente l'heure à laquelle le message est publié dans Pub/Sub. Dans un scénario de traitement par flux, vous pouvez également vous soucier de l'horodatage de l'événement, qui correspond à l'heure à laquelle les données du message ont été générées. Vous pouvez utiliser l'horodatage d'élément Apache Beam pour représenter l'heure de l'événement. Les sources qui créent une PCollection
illimitée attribuent souvent à chaque nouvel élément un horodatage correspondant à l'heure de l'événement.
Pour Java et Python, le connecteur d'E/S Pub/Sub peut écrire l'horodatage de chaque élément en tant qu'attribut de message Pub/Sub. Les clients de messages peuvent utiliser cet attribut pour obtenir l'horodatage de l'événement.
Java
Appelez PubsubIO.Write<T>.withTimestampAttribute
et spécifiez le nom de l'attribut.
Python
Spécifiez le paramètre timestamp_attribute
lorsque vous appelez WriteToPubSub
.
Distribution des messages
Dataflow accepte le traitement de type "exactement une fois" des messages au sein d'un pipeline. Toutefois, le connecteur d'E/S Pub/Sub ne peut pas garantir une distribution "exactement une fois" des messages via Pub/Sub.
Pour Java et Python, vous pouvez configurer le connecteur d'E/S Pub/Sub pour écrire l'ID unique de chaque élément en tant qu'attribut de message. Les clients de messages peuvent ensuite utiliser cet attribut pour dédupliquer les messages.
Java
Appelez PubsubIO.Write<T>.withIdAttribute
et spécifiez le nom de l'attribut.
Python
Spécifiez le paramètre id_label
lorsque vous appelez WriteToPubSub
.
Exemples
L'exemple suivant crée un PCollection
de messages Pub/Sub et les écrit dans un sujet Pub/Sub. Le sujet est spécifié en tant qu'option de pipeline. Chaque message contient des données de charge utile et un ensemble d'attributs.
Java
Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Python
Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.