E/S Pub/Sub

Les transformations intégrées Read et Write pour Cloud Pub/Sub sont incluses dans PubsubIO. Vous pouvez utiliser PubsubIO pour lire des données depuis (ou écrire des données dans) un sujet ou un abonnement Cloud Pub/Sub. De plus, vous pouvez réaliser un traitement des flux de messages Cloud Pub/Sub avec une garantie d'unicité, car PubsubIO supprime les doublons parmi les messages à partir des identifiants de message personnalisés ou des identifiants attribués par Cloud Pub/Sub.

Remarque : Par défaut, les transformations PubsubIO créent des PCollections illimitées. Une fois que vous avez créé une PCollection à l'aide de PubsubIO.Read, vous devez appliquer des règles de fenêtrage à cette PCollection avant d'appliquer toute transformation regroupant des éléments, telle que GroupByKey ou Combine.

Spécifier un sujet ou un abonnement Cloud Pub/Sub

Pour utiliser PubsubIO, vous pouvez fournir soit le nom d'un sujet Cloud Pub/Sub, soit un abonnement que vous avez préalablement créé pour un sujet donné. Si vous utilisez une transformation PubsubIO avec un nom de sujet, Dataflow crée et gère automatiquement un abonnement pour vous en coulisses.

Spécifier un sujet Cloud Pub/Sub

Lorsque vous utilisez PubsubIO, vous pouvez choisir de spécifier un sujet Cloud Pub/Sub à utiliser pour lire ou écrire des données. Lorsque vous fournissez un nom de sujet, Dataflow crée automatiquement un abonnement à ce sujet pour vous. Dataflow commence à lire des données dans le sujet à partir du moment où l'exécution de votre pipeline commence. Toutes les données publiées dans le sujet avant le démarrage de votre pipeline seront indisponibles.

Remarque : Dataflow crée l'abonnement Cloud Pub/Sub requis dans le cadre de la configuration de votre pipeline. Les temps de configuration du pipeline pouvant varier suivant le nombre d'instances Compute Engine et les autres ressources allouées dans Cloud Platform, il peut être difficile d'identifier exactement quand votre pipeline commence à lire les données dans le sujet Cloud Pub/Sub. Si vous avez besoin d'un contrôle plus précis sur le timing de lecture et d'écriture des données, vous pouvez créer et gérer votre propre abonnement à un sujet Cloud Pub/Sub et le transmettre à PubsubIO. Pour plus d'informations, reportez-vous à la section Spécifier un abonnement Cloud Pub/Sub.

Le nom de sujet que vous renseignez doit suivre le format projects/<Cloud Platform Project Name>/topics/<topic name>, où "Project Name" correspond au nom du projet propriétaire du sujet. De plus, le nom du sujet doit répondre aux exigences suivantes :

  • Le nom du sujet doit comporter entre 3 et 255 caractères.
  • Le nom du sujet ne peut contenir que des lettres minuscules, des chiffres, des tirets ("-"), des traits de soulignement ("_") et des points (".").
  • Le nom du sujet doit commencer par une lettre.
  • Le nom du sujet doit se terminer par une lettre ou un chiffre.
  • Le nom du sujet ne peut pas commencer par le préfixe "goog".

Spécifier un abonnement Cloud Pub/Sub

Lorsque vous utilisez une transformation PubsubIO, vous pouvez choisir de spécifier un abonnement Cloud Pub/Sub à utiliser pour la lecture ou l'écriture de données dans un sujet Cloud Pub/Sub donné. Pour utiliser un abonnement, vous devez avoir préalablement créé et géré vous-même cet abonnement. Pour plus d'informations à ce sujet, consultez la Documentation relative aux abonnements Cloud Pub/Sub

Le nom d'abonnement que vous renseignez doit suivre le format /projects/<Cloud Platform Project Name>/subscriptions/<subscription name>, où "Project name" correspond au nom du projet propriétaire de l'abonnement. De plus, le nom de l'abonnement doit répondre aux exigences suivantes :

  • Le nom de l'abonnement doit comporter entre 3 et 255 caractères.
  • Le nom de l'abonnement ne peut contenir que des lettres minuscules, des chiffres, des tirets ("-"), des traits de soulignement ("_") et des points (".").
  • Le nom de l'abonnement doit commencer par une lettre.
  • Le nom de l'abonnement doit se terminer par une lettre ou un chiffre.
  • Le nom de l'abonnement ne peut pas commencer par le préfixe "goog".

Utilisez un abonnement si vous voulez garantir que votre pipeline ne manque aucune donnée de votre sujet Cloud Pub/Sub. Un abonnement Cloud Pub/Sub continue à collecter des données même si aucun pipeline ne les lit. Lorsque votre pipeline se lance, il a accès à toutes les données collectées par l'abonnement, y compris celles arrivées avant le démarrage du pipeline. De même, si vous arrêtez votre pipeline et créez ultérieurement un nouveau pipeline lisant les données à partir du même abonnement, aucune donnée ne sera perdue, car l'abonnement continuera à collecter des données en l'absence de tout pipeline.

Lire avec PubsubIO

La transformation PubsubIO.Read lit en continu les données d'un flux Cloud Pub/Sub et renvoie une PCollection illimitée de chaînes String représentant les données de ce flux. Par défaut, chaque élément de la PCollection résultante est encodé sous forme de chaîne de caractères UTF-8. Vous pouvez modifier l'encodage par défaut en utilisant .withCoder lorsque vous appelez PubsubIO.Read.

Java

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  // streamData is Unbounded; apply windowing afterward.
  PCollection<String> streamData =
    p.apply(PubsubIO.Read.named("ReadFromPubsub")
                       .topic("/topics/my-topic"));

Lire un ensemble limité d'enregistrements dans Cloud Pub/Sub

À des fins de test, vous pouvez lire des données dans Cloud Pub/Sub avec InProcessPipelineRunner (SDK Dataflow pour Java 1.X) ou DirectRunner (SDK Dataflow SDK pour Java 2.X).

Si, pour vos tests, vous devez travailler sur une collection limitée, vous pouvez définir une limite sur la quantité de données d'entrée à lire. Vous pouvez utiliser l'option .maxNumRecords pour lire un nombre maximal fixe d'enregistrements, ou utiliser .maxReadTime pour lire des enregistrements pendant une durée déterminée.

Notez que, dans ce mode, la déduplication, la répétition de tentatives et la récupération en cas d'échec ne sont pas garanties. Pour la production, vous devez traiter Cloud Pub/Sub comme une source illimitée.

Écrire avec PubsubIO

La transformation PubsubIO.Write écrit en continu une PCollection illimitée d'objets String dans un flux Cloud Pub/Sub. Par défaut, la PCollection fournie en entrée de PubsubIO.Write doit contenir des chaînes encodées en UTF-8. Vous pouvez modifier le type d'entrées attendu et leur encodage à l'aide de withCoder.

Java

  // streamData is Unbounded.
  PCollection<String> streamData = ...;
  streamData.apply(PubsubIO.Write.named("WriteToPubsub")
                       .topic("/topics/my-topic"));

Horodatages et ID d'enregistrements

Vous pouvez ajouter deux types de métadonnées aux enregistrements que vous lisez ou écrivez à l'aide de PubsubIO : les horodatages et les ID d'enregistrement.

Utiliser des horodatages spécifiés par l'utilisateur

Vous pouvez utiliser des horodatages que vous spécifiez vous-même pour contrôler plus précisément la manière dont les éléments lus dans Cloud Pub/Sub sont assignés aux fenêtres d'un pipeline Dataflow. Pour définir ce type d'horodatage, invoquez timestampLabel lorsque vous construisez vos transformations PubsubIO.Read ou PubsubIO.Write et passez-lui une valeur de chaîne de votre choix.

Si vous spécifiez vous-même un libellé d'horodatage lorsque vous exploitez PubsubIO.Read, la transformation utilise, lors de la lecture d'éléments dans Cloud Pub/Sub, la valeur de l'attribut correspondant au nom de la chaîne que vous avez transmise à timestampLabel comme horodatage pour chaque message entrant. Les horodatages doivent être soit exprimés en nombre de millisecondes écoulées depuis l'époque Unix, soit mis en forme conformément à la RFC 3339.

Si vous définissez vous-même un libellé d'horodatage lorsque vous utilisez PubsubIO.Write, la transformation écrit chaque élément en tant que message Cloud Pub/Sub avec un attribut portant le nom que vous avez fourni et possédant comme valeur l'horodatage de l'élément exprimé en millisecondes écoulées depuis l'époque Unix.

Utiliser les ID d'enregistrement

Les ID d’enregistrement permettent de garantir l'unicité de traitement à l'interface entre Dataflow et d’autres systèmes. Pour utiliser les ID d'enregistrement, invoquez idLabel lorsque vous construisez vos transformations PubsubIO.Read ou PubsubIO.Write et passez-lui une valeur de chaîne de votre choix.

Si vous avez défini un libellé d'ID d'enregistrement lors de l'utilisation de PubsubIO.Read, Dataflow supprimera tous les messages sauf un lorsqu'il recevra plusieurs messages ayant le même ID (cet ID sera lu à partir de l'attribut portant le nom de la chaîne que vous avez transmise à idLabel). Toutefois, Dataflow n'effectue pas cette déduplication pour des messages comportant la même valeur d'ID d'enregistrement s'ils sont publiés dans Cloud Pub/Sub à plus de 10 minutes d'intervalle.

Si vous définissez un libellé d'ID d'enregistrement lorsque vous utilisez PubsubIO.Write, la transformation ajoute à tous les messages sortants un attribut portant le nom que vous avez spécifié et possédant une valeur unique. Les systèmes en aval peuvent utiliser cette valeur unique pour dédupliquer les messages.

Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.