Sources et récepteurs personnalisés

Les SDK Dataflow fournissent une API extensible qui vous permet de créer des sources et récepteurs de données personnalisés. Ces derniers sont requis si vous souhaitez que votre pipeline lise des données (ou en écrive) à partir d'une source ou d'un récepteur de données qui n'est pas compatible de manière native avec les SDK Dataflow.

Pour créer une source personnalisée, vous devez étendre les sous-classes Source abstraites du SDK Dataflow, telles que BoundedSource ou UnboundedSource. Pour créer un récepteur personnalisé, vous devez étendre la classe de base Sink du SDK Dataflow. L'API extensible vous permet de créer des sources personnalisées qui lisent des données limitées (lots) ou illimitées (flux), ainsi que des récepteurs qui n'écrivent que des données limitées.

Dataflow prévoit d'ajouter la compatibilité avec les récepteurs personnalisés qui écrivent des données illimitées dans une prochaine version.

Exigences de base relatives au code des sources et récepteurs personnalisés

Le service Dataflow exploite les classes que vous fournissez pour lire et/ou écrire des données à l'aide de plusieurs instances de nœuds de calcul exécutées en parallèle. Par conséquent, le code que vous fournissez pour les sous-classes Source et Sink doit respecter certaines exigences de base :

Sérialisabilité

Votre sous-classe Source ou Sink, qu'elle soit bornée ou non, doit être Serializable. Le service Dataflow peut créer plusieurs instances de votre sous-classe Source ou Sink pour les envoyer à plusieurs collaborateurs distants afin de faciliter la lecture ou l'écriture en parallèle.

Immuabilité

Votre Source ou Sink sous-classe doit être effectivement immuable. Tous les champs privés doivent être déclarés final, et toutes les variables privées du type de collection doivent être effectivement immuables. Si votre classe dispose de mutateurs, ceux-ci doivent renvoyer une copie indépendante de l'objet avec le champ correspondant modifié.

Vous ne devez utiliser l'état mutable dans votre sous-classe Source ou Sink que si vous utilisez une évaluation paresseuse des calculs coûteux que vous devez implémenter. Dans ce cas, vous devez déclarer toutes les variables d'instance modifiables transient.

Sécurité

Si vous créez votre source personnalisée afin d'exploiter la fonctionnalité de rééquilibrage dynamique du travail de Dataflow, il est essentiel de rendre le code thread-safe. Le SDK Dataflow pour Java fournit une classe d'assistance qui vous aide à appliquer cette démarche. Pour en savoir plus, consultez la section Utiliser la classe BoundedSource avec le rééquilibrage dynamique du travail ci-dessous.

Testabilité

Il est primordial de tester de manière exhaustive l'ensemble de vos sous-classes Source et Sink, en particulier si vous développez vos classes avec des fonctionnalités avancées telles que Dynamic Work Rebalancing. Une erreur mineure lors de la mise en œuvre peut entraîner une corruption ou une perte de données (liée à des enregistrements ignorés ou dupliqués, par exemple) souvent difficile à détecter.

Pour vous aider à tester les sources, le SDK Dataflow fournit la classe SourceTestUtils. SourceTestUtils contient des utilitaires permettant de vérifier automatiquement certaines propriétés de votre mise en œuvre BoundedSource. Vous pouvez utiliser SourceTestUtils pour augmenter la couverture des tests de votre implémentation en utilisant un large éventail d'entrées avec relativement peu de lignes de code.

Créer une source personnalisée

Pour créer une source de données personnalisée pour votre pipeline, vous devez fournir la logique spécifique au format qui régit la façon dont le service Dataflow lit les données de votre source d'entrée et divise cette source en plusieurs parties pour permettre à plusieurs instances de nœuds de calcul de lire vos données en parallèle. Si vous créez une source de données personnalisée qui lit des données illimitées, vous devez fournir une logique supplémentaire pour gérer le filigrane de votre source et créer des points de contrôle facultatifs.

Pour fournir la logique pour votre source personnalisée, vous devez créer les classes suivantes :

  • Une sous-classe BoundedSource si vous souhaitez lire un ensemble de données limité (par lots), ou une sous-classe UnboundedSource si vous souhaitez lire un ensemble de données illimité (flux). Ces sous-classes décrivent les données que vous souhaitez lire, y compris leur emplacement et leurs paramètres (tels que la quantité de données à lire).
  • Sous-classe de la classe Dataflow SDK Source.Reader. Chaque Source doit être associé à un Reader qui capture tout l'état impliqué dans la lecture à partir de ce Source. Il peut s'agir d'opérations de gestion de fichiers, de connexions RPC et d'autres paramètres qui dépendent des exigences spécifiques relatives au format de données que vous souhaitez lire.

    La hiérarchie des classes Reader reflète la hiérarchie Source. Si vous étendez BoundedSource, vous devez fournir un BoundedReader ; si vous étendez UnboundedSource, vous devez fournir un UnboundedReader associé.

Mettre en œuvre la sous-classe Source

Vous devez créer une sous-classe de BoundedSource ou de UnboundedSource, selon que vos données sont un groupe fini ou un flux infini. Dans les deux cas, votre sous-classe Source doit remplacer les méthodes abstraites de la superclasse. Lorsque le service Dataflow utilise votre source de données personnalisée, il exploite ces méthodes pour estimer la taille de votre ensemble de données et le diviser pour la lecture en parallèle.

Votre sous-classe Source doit également gérer les informations de base relatives à votre source de données, telles que le lieu. Par exemple, l'exemple Source de mise en œuvre dans la classe DatastoreIO de Dataflow prend comme arguments les host, datasetID et query utilisés pour obtenir des données Datastore.

BoundedSource

BoundedSource représente un ensemble de données finies à partir duquel le service Dataflow peut lire, éventuellement en parallèle. BoundedSource contient un ensemble de méthodes abstraites utilisées par le service pour scinder l'ensemble de données afin de le lire avec plusieurs opérateurs distants.

Pour implémenter un BoundedSource, votre sous-classe doit remplacer les méthodes abstraites suivantes :

  • splitIntoBundles : le service Dataflow utilise cette méthode pour diviser vos données finies en groupes d'une taille donnée.
  • getEstimatedSizeBytes : le service Dataflow utilise cette méthode pour estimer la taille totale de vos données, en octets.
  • producesSortedKeys : méthode permettant de dire au service Dataflow si votre source génère des paires clé-valeur dans un ordre trié. Si votre source ne génère pas de paires clé/valeur, votre mise en œuvre de cette méthode doit renvoyer false.
  • createReader : crée le BoundedReader associé à ce BoundedSource.

Pour découvrir comment mettre en œuvre BoundedSourceBoundedSource et les méthodes abstraites requises, consultez l'exemple de mise en œuvre de BoundedSourceDatastoreIO du SDK Dataflow.

UnboundedSource

UnboundedSource représente un flux de données infini à partir duquel le service Dataflow peut lire, éventuellement en parallèle. UnboundedSource contient un ensemble de méthodes abstraites que le service utilise pour prendre en charge les lectures en streaming en parallèle ; y compris le point de contrôle pour la récupération après échec, l'ID d'enregistrement pour éviter la duplication de données, le filigranage pour estimer l'exhaustivité des données dans les parties en aval de votre pipeline.

Pour implémenter un UnboundedSource, votre sous-classe doit remplacer les méthodes abstraites suivantes :

Mettre en œuvre la sous-classe Reader

Vous devez créer une sous-classe de BoundedReader ou de UnboundedReader à renvoyer par la méthode createReader de votre sous-classe source. Le service Dataflow utilise les méthodes de votre Reader (borné ou non) pour effectuer la lecture réelle de votre ensemble de données.

BoundedReader et UnboundedReader ont des interfaces de base similaires, que vous devrez définir. En outre, il existe d'autres méthodes propres à UnboundedReader que vous devez mettre en œuvre pour travailler avec des données illimitées, et une méthode facultative que vous pouvez mettre en œuvre si vous souhaitez BoundedReader pour tirer parti de la fonctionnalité Rééquilibrage du travail dynamique de Dataflow. Il existe également des différences mineures dans la sémantique pour les méthodes start() et advance() lors de l'utilisation de UnboundedReader.

Méthodes du lecteur communes à BoundedReader et UnboundedReader

Dataflow utilise les méthodes suivantes pour lire les données en utilisant BoundedReader ou UnboundedReader :

  • start : initialise le Reader et avance jusqu'au premier enregistrement à lire. Cette méthode est appelée exactement une fois lorsque Dataflow commence à lire des données. Il est recommandé d'y intégrer les opérations coûteuses nécessaires à l'initialisation.
  • advance : avance le lecteur vers le prochain enregistrement valide. Cette méthode doit renvoyer false si aucune autre entrée n'est disponible. BoundedReader doit cesser de lire une fois que advance renvoie la valeur false, mais UnboundedReader peut renvoyer true dans les prochains appels une fois les données disponibles dans votre flux.
  • getCurrent : renvoie l'enregistrement à la position actuelle, lu en dernier lieu par start ou advance.
  • getCurrentTimestamp : renvoie l'horodatage de l'enregistrement de données actuel. Il vous suffit de remplacer getCurrentTimestamp si votre source lit des données associées à des horodatages intrinsèques. Le service Dataflow utilise cette valeur pour définir l'horodatage intrinsèque pour chaque élément dans la sortie résultante PCollection.

Méthodes du lecteur propres à UnboundedReader

En plus de l'interface de base Reader, UnboundedReader propose des méthodes supplémentaires pour gérer les lectures à partir d'une source de données illimitée :

  • getCurrentRecordId : renvoie un identifiant unique pour l'enregistrement en cours. Le service Dataflow filtre les enregistrements en double à l'aide de ces ID. Si des ID logiques sont présents dans chaque enregistrement de vos données, cette méthode vous permet de les renvoyer. Si ce n'est pas le cas, vous pouvez renvoyer un hachage (d'au moins 128 bits) du contenu de l'enregistrement. (Il est déconseillé d'utiliser Java Object.hashCode(), car un hachage 32 bits est généralement insuffisant pour éviter les collisions.)
  • Remarque : La mise en œuvre de getCurrentRecordId est facultative si votre source utilise un schéma de point de reprise qui identifie chaque enregistrement de manière unique. Toutefois, les ID d'enregistrement peuvent toujours s'avérer utiles si les systèmes en amont qui écrivent des données dans votre source produisent parfois des enregistrements en double pouvant ensuite être lus par cette dernière.

  • getWatermark : renvoie un filigrane fourni par votre Reader. Le watermark correspond à la limite inférieure approximative de l'horodatage des futurs éléments à lire par votre Reader. Le service Dataflow exploite le filigrane pour estimer l'exhaustivité des données. Les filigranes sont utilisés dans les fonctions de fenêtrage et de déclenchement de Dataflow.
  • getCheckpointMark : le service Dataflow utilise cette méthode pour créer un point de contrôle dans votre flux de données. Le point de reprise représente la progression de UnboundedReader, qui peut être utilisée pour la récupération après échec. Des flux de données différents peuvent utiliser différentes méthodes de création de points de contrôle. Certaines sources peuvent exiger la confirmation des enregistrements reçus, alors que d'autres peuvent recourir à la création de points de contrôle positionnels. Vous devez adapter cette méthode au schéma de création de points de contrôle le plus approprié. Par exemple, vous pouvez demander à cette méthode de renvoyer les enregistrements récemment confirmés.
  • Remarque : getCheckpointMark est facultatif. Vous n'avez pas besoin de le mettre en œuvre si vos données ne comportent pas de points de contrôle significatifs. Toutefois, si vous choisissez de ne pas mettre en œuvre la création de points de contrôle dans votre source, vous risquez de rencontrer des données en double dans votre pipeline ou d'en perdre, selon que votre source de données tente de renvoyer des enregistrements en cas d'erreur ou non.

Utiliser la classe BoundedSource avec le rééquilibrage dynamique du travail

Si votre source fournit des données limitées, vous pouvez faire en sorte que votre BoundedReader fonctionne avec la fonctionnalité Dynamic Work Rebalancing du service Dataflow en implémentant la méthode splitAtFraction. Le service Dataflow peut appeler splitAtFraction simultanément avec start ou advance sur un lecteur donné afin que les autres données de votre Source puissent être divisées et redistribuées aux autres collaborateurs.

Lorsque vous mettez en œuvre splitAtFraction, votre code doit produire un ensemble de divisions mutuellement exclusives où l'union de ces divisions correspond à l'ensemble de données total.

Classes de base Source et Reader

Le SDK Dataflow contient des classes de base abstraites pratiques pour vous aider à créer des classes Source et Reader compatibles avec les formats de stockage courants, tels que les fichiers.

FileBasedSource

Si votre source de données utilise des fichiers, vous pouvez déduire vos classes Source et Reader à partir des classes de base abstraites FileBasedSource et FileBasedReader dans le SDK Dataflow pour Java. FileBasedSource est une sous-classe source bornée qui met en œuvre le code commun aux sources Dataflow qui interagissent avec les fichiers, notamment :

  • l'expansion des modèles de fichier ;
  • la lecture séquentielle des enregistrements ;
  • les points de division.

XmlSource

Si votre source de données utilise des fichiers au format XML, vous pouvez déduire votre classe Source de la classe de base abstraite XmlSource dans le SDK Dataflow pour Java. XmlSource étend FileBasedSource et fournit des méthodes supplémentaires pour analyser les fichiers XML, comme définir les éléments XML qui désignent la racine du fichier et les enregistrements individuels dans le fichier.

Lire des données à partir d'une source personnalisée

Pour lire les données d'une source personnalisée dans votre pipeline, appliquez la transformation générique Read du SDK et transmettez votre source personnalisée en tant que paramètre à l'aide de l'opération .from :

Java

      MySource source = new MySource(false, file.getPath(), 64, null);
      p.apply("ReadFileData", Read.from(source))
    

Créer un récepteur personnalisé

Pour créer un récepteur de données personnalisé pour votre pipeline, vous devez indiquer la logique spécifique au format qui indique au service Dataflow comment écrire des données limitées des PCollection s du pipeline vers un récepteur, comme un répertoire ou un système de fichiers, une table de base de données, etc. Le service Dataflow écrit des groupes de données en parallèle à l'aide de plusieurs opérateurs.

Remarque : Dataflow n'est actuellement compatible qu'avec l'écriture de données illimitées dans un récepteur de sortie personnalisé.

Vous fournissez la logique d'écriture en créant les classes suivantes :

  • Une sous-classe de la classe de base abstraite Réservoir du SDK Dataflow. Sink décrit un emplacement ou une ressource à laquelle votre pipeline peut écrire en parallèle. Votre sous-classe Sink peut contenir des champs tels que l'emplacement de la ressource ou du fichier, le nom de la table de base de données, etc.
  • Sous-classe de Sink.WriteOperation. Sink.WriteOperation représente l'état d'une seule opération d'écriture en parallèle vers l'emplacement de sortie décrit dans votre Sink. Votre sous-classe WriteOperation doit définir les processus d'initialisation et de finalisation de l'écriture en parallèle.
  • Sous-classe de Sink.Writer. Sink.Writer écrit un groupe d'éléments à partir d'une entrée PCollection vers le collecteur de données désigné.

Mettre en œuvre la sous-classe Sink

Votre sous-classe Sink décrit l'emplacement ou la ressource vers laquelle votre pipeline écrit ses résultats. Cela peut inclure un système de fichiers, un tableau de données ou un ensemble de données, etc. Votre sous-classe Sink doit valider que l'emplacement de sortie peut être écrit et créer WriteOperation s qui définissent comment écrire des données dans cet emplacement de sortie.

Pour mettre en œuvre un Sink, votre sous-classe doit remplacer les méthodes abstraites suivantes :

  • validate : cette méthode garantit que l'emplacement de sortie des données du pipeline est valide et qu'il est possible d'y écrire. validate doit s'assurer que le fichier peut être ouvert, que le répertoire de sortie existe, que l'utilisateur dispose d'autorisations d'accès pour la table de base de données, etc. Le service Dataflow appelle validate lors de la création du pipeline.
  • createWriteOperation : cette méthode crée un objet Sink.WriteOperation qui définit comment écrire dans l'emplacement de sortie.

Mettre en œuvre la sous-classe WriteOperation

Votre sous-classe WriteOperation définit comment écrire un groupe d'éléments à l'emplacement de sortie défini dans votre Sink. Le WriteOperation effectue l'initialisation et la finalisation nécessaires pour une écriture en parallèle.

Pour mettre en œuvre un WriteOperation, votre sous-classe doit remplacer les méthodes abstraites suivantes :

  • initialize : cette méthode effectue toute initialisation nécessaire avant d'écrire dans l'emplacement de sortie. Le service Dataflow appelle cette méthode avant le début de l'écriture. Vous pouvez utiliser initialize pour, par exemple, créer un répertoire de sortie temporaire.
  • finalize : cette méthode gère les résultats d'une écriture effectuée par une classe Writer. Votre mise en œuvre de finalize doit effectuer le nettoyage des écritures ou des écritures échouées qui ont été relancées et doit pouvoir localiser toute sortie temporaire ou partielle écrite par des échecs d'écriture.

    Comme finalize peut être appelé plusieurs fois en cas d'échec ou de nouvelle tentative, une bonne pratique consiste à effectuer votre implémentation de finalize atomic ; si cela n'est pas possible, vous devez mettre en œuvre finalize idempotent.
  • createWriter : cette méthode crée un objet Sink.Writer qui écrit un groupe de données à l'emplacement de sortie défini dans votre Sink.

Mettre en œuvre la sous-classe Writer

Votre sous-classe Writer met en œuvre la logique d'écriture d'un seul groupe d'enregistrements à l'emplacement de sortie défini dans votre Sink. Le service Dataflow peut créer plusieurs instances de votre Writer dans des threads différents sur le même poste de travail. L'accès aux membres ou aux méthodes statiques doit donc être sécurisé.

Pour mettre en œuvre un Writer, votre sous-classe doit remplacer les méthodes abstraites suivantes :

  • open : cette méthode effectue toute initialisation pour le groupe d'enregistrements à écrire, comme la création d'un fichier temporaire pour l'écriture. Le service Dataflow appelle cette méthode au début de l'écriture et lui transmet un ID de lot unique pour que le lot d'enregistrements puisse être écrit.
  • write : cette méthode écrit un seul enregistrement à l'emplacement de sortie. Le service Dataflow appelle write pour chaque valeur du groupe.
  • close : cette méthode termine l'écriture et ferme toutes les ressources utilisées pour l'écriture du groupe. close doit renvoyer un résultat en écriture, que l'entité englobante WriteOperation utilisera pour identifier les écritures réussies. Le service Dataflow appelle cette méthode à la fin de l'écriture.

Gérer les ID de lot

Lorsque le service appelle Writer.open, il transmet un ID de groupe unique pour les enregistrements à écrire. Votre Writer doit utiliser cet ID de groupe pour s'assurer que sa sortie n'interfère pas avec celle des autres instances Writer qui auraient pu être créées en parallèle. Ce point est particulièrement important, car le service Dataflow peut relancer des écritures plusieurs fois en cas d'échec.

Par exemple, si votre sortie Sink est basée sur des fichiers, Writer peut utiliser l'ID de groupe comme suffixe de nom de fichier pour que votre Writer écrit ses enregistrements dans un fichier de sortie unique qui n'est pas utilisé par les autres Writer. Vous pouvez ensuite faire en sorte que la méthode Writer de close renvoie l'emplacement de ce fichier dans le résultat de l'écriture.

Vous pouvez voir un modèle expliquant comment mettre en œuvre Sink, WriteOperation et Writer avec leurs méthodes abstraites requises dans l'exemple de DatastoreIO du SDK Dataflow.

Classes de base Sink et Writer

Le SDK Dataflow contient des classes de base abstraites pratiques pour vous aider à créer des classes Source et Reader compatibles avec les formats de stockage courants, tels que les fichiers.

FileBasedSink

Si votre source de données utilise des fichiers, vous pouvez déduire votre Sink, WriteOperation, et Writer à partir des classes de base abstraites FileBasedSink, FileBasedWriteOperation, et FileBasedWriter dans le SDK Dataflow pour Java. Ces classes mettent en œuvre le code commun aux sources Dataflow interagissant avec les fichiers, y compris :

  • la définition des en-têtes et des pieds de page de fichiers ;
  • l'écriture séquentielle des enregistrements ;
  • la définition du type de sortie MIME.

FileBasedSink et ses sous-classes sont compatibles avec l'écriture dans les fichiers locaux et dans les fichiers Google Cloud Storage Pour plus d'informations, voir l'exemple de mise en œuvre pour FileBasedSink appelé XmlSink dans le SDK Dataflow pour Java.

Écrire des données dans un récepteur personnalisé

Pour écrire des données dans un pipeline personnalisé, appliquez la transformation générique Write du SDK et transmettez votre récepteur personnalisé en tant que paramètre à l'aide de l'opération .to :

Java

      p.apply("WriteResults", Write.to(new MySink()));