Ce document explique comment écrire des données textuelles depuis Dataflow vers Cloud Storage à l'aide du Connecteur d'E/S PubSubIO
d'Apache Beam.
Présentation
Pour écrire des données dans Pub/Sub, utilisez le connecteur PubSubIO
. Les éléments d'entrée peuvent être des messages Pub/Sub ou uniquement les données du message.
Si les éléments d'entrée sont des messages Pub/Sub, vous pouvez éventuellement définir des attributs ou une clé de tri sur chaque message.
Vous pouvez utiliser la version Java, Python ou Go du connecteur PubSubIO
, comme suit :
Java
Pour écrire sur un seul sujet, appelez la méthode PubsubIO.writeMessages
. Cette méthode utilise une collection d'objets PubsubMessage
. Le connecteur définit également des méthodes pratiques pour écrire des chaînes, des messages Avro encodés en binaire ou des messages protobuf encodés en binaire. Ces méthodes convertissent la collection d'entrée en messages Pub/Sub.
Pour écrire dans un ensemble dynamique de sujets en fonction des données d'entrée, appelez writeMessagesDynamic
. Spécifiez le sujet de destination pour chaque message en appelant PubsubMessage.withTopic
sur le message. Par exemple, vous pouvez acheminer les messages vers différents sujets en fonction de la valeur d'un champ particulier dans vos données d'entrée.
Pour plus d'informations, consultez la documentation de référence de PubsubIO
.
Python
Appelez la méthode pubsub.WriteToPubSub
.
Par défaut, cette méthode utilise une collection d'entrée de type bytes
, qui représente la charge utile du message. Si le paramètre with_attributes
est True
, la méthode utilise une collection d'objets PubsubMessage
.
Pour en savoir plus, consultez la documentation de référence du module pubsub
.
Go
Pour écrire des données dans Pub/Sub, appelez la méthode pubsubio.Write
. Cette méthode utilise une collection d'entrée d'objets PubSubMessage
ou de tranches d'octets contenant les charges utiles des messages.
Pour plus d'informations, consultez la documentation de référence du package pubsubio
.
Pour en savoir plus sur les messages Pub/Sub, consultez la section Format de message dans la documentation Pub/Sub.
Horodatages
Pub/Sub définit un code temporel sur chaque message. Ce code temporel représente l'heure à laquelle le message est publié dans Pub/Sub. Dans un scénario de streaming, vous pouvez également vous intéresser au code temporel de l'événement, qui correspond à l'heure à laquelle les données du message ont été générées. Vous pouvez utiliser l'horodatage des éléments Apache Beam pour représenter l'heure de l'événement. Les sources qui créent une PCollection
illimitée attribuent souvent un code temporel correspondant à l'heure de l'événement à chaque nouvel élément.
Pour Java et Python, le connecteur d'E/S Pub/Sub peut écrire l'horodatage de chaque élément en tant qu'attribut de message Pub/Sub. Les consommateurs de messages peuvent utiliser cet attribut pour obtenir le code temporel de l'événement.
Java
Appelez PubsubIO.Write<T>.withTimestampAttribute
et spécifiez le nom de l'attribut.
Python
Spécifiez le paramètre timestamp_attribute
lorsque vous appelez WriteToPubSub
.
Distribution des messages
Dataflow prend en charge le traitement de type "exactement une fois" des messages dans un pipeline. Toutefois, le connecteur d'E/S Pub/Sub ne peut pas garantir la distribution de type "exactement une fois" des messages via Pub/Sub.
Pour Java et Python, vous pouvez configurer le connecteur d'E/S Pub/Sub pour qu'il écrive l'ID unique de chaque élément en tant qu'attribut de message. Les consommateurs de messages peuvent ensuite utiliser cet attribut pour dédupliquer les messages.
Java
Appelez PubsubIO.Write<T>.withIdAttribute
et spécifiez le nom de l'attribut.
Python
Spécifiez le paramètre id_label
lorsque vous appelez WriteToPubSub
.
Sortie directe
Si vous activez le mode de traitement en flux continu de type "au moins une fois" dans votre pipeline, le connecteur d'E/S utilise la sortie directe. Dans ce mode, le connecteur ne crée pas de point de contrôle des messages, ce qui permet d'accélérer les écritures. Toutefois, les nouvelles tentatives dans ce mode peuvent entraîner des messages en double avec des ID de message différents, ce qui peut rendre la suppression des doublons plus difficile pour les consommateurs de messages.
Pour les pipelines qui utilisent le mode "exactement une fois", vous pouvez activer la sortie directe en définissant l'option de service streaming_enable_pubsub_direct_output
. La sortie directe réduit la latence d'écriture et améliore le traitement. Choisissez cette option si vos consommateurs de messages peuvent gérer les messages en double avec des ID de message non uniques.
Examples
L'exemple suivant crée un PCollection
de messages Pub/Sub et les écrit dans un sujet Pub/Sub. Le sujet est spécifié en tant qu'option de pipeline. Chaque message contient des données de charge utile et un ensemble d'attributs.
Java
Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Python
Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.