Sources et récepteurs personnalisés

Les SDK Dataflow fournissent une API extensible qui vous permet de créer des sources et récepteurs de données personnalisés. Ces derniers sont requis si vous souhaitez que votre pipeline lise des données (ou en écrive) à partir d'une source ou d'un récepteur de données qui n'est pas compatible de manière native avec les SDK Dataflow.

Pour créer une source personnalisée, vous devez étendre les sous-classes Source abstraites du SDK Dataflow, telles que BoundedSource ou UnboundedSource. Pour créer un récepteur personnalisé, vous devez étendre la classe de base Sink du SDK Dataflow. L'API extensible vous permet de créer des sources personnalisées qui lisent des données limitées (lots) ou illimitées (flux), ainsi que des récepteurs qui n'écrivent que des données limitées.

Dataflow prévoit d'ajouter la compatibilité avec les récepteurs personnalisés qui écrivent des données illimitées dans une prochaine version.

Exigences de base relatives au code des sources et récepteurs personnalisés

Le service Dataflow exploite les classes que vous fournissez pour lire et/ou écrire des données à l'aide de plusieurs instances de nœuds de calcul exécutées en parallèle. Ainsi, le code que vous fournissez pour les sous-classes Source et Sink doit répondre à certaines exigences de base :

Sérialisabilité

Votre sous-classe Source ou Sink, qu'elle soit limitée ou non, doit être Serializable. Le service Dataflow peut créer diverses instances de votre sous-classe Source ou Sink à envoyer à plusieurs nœuds de calcul distants afin de faciliter la lecture ou l'écriture en parallèle.

Immuabilité

Votre sous-classe Source ou Sink doit être effectivement immuable. Tous les champs privés doivent être déclarés comme finaux (à l'aide du paramètre final), et toutes les variables privées du type de collection doivent être effectivement immuables. Si votre classe dispose de mutateurs, ceux-ci doivent renvoyer une copie indépendante de l'objet avec le champ correspondant modifié.

Vous ne devez utiliser l'état modifiable dans votre sous-classe Source ou Sink que si vous procédez à une évaluation minimaliste des calculs coûteux nécessaires à la mise en œuvre de la source. Dans ce cas, vous devez déclarer toutes les variables d'instance modifiables comme transitoires (à l'aide du paramètre transient).

Sécurité

Si vous créez votre source personnalisée afin d'exploiter la fonctionnalité de rééquilibrage dynamique du travail de Dataflow, il est essentiel de rendre le code thread-safe. Le SDK Dataflow pour Java fournit une classe d'assistance qui vous aide à appliquer cette démarche. Pour en savoir plus, consultez la section Utiliser la classe BoundedSource avec le rééquilibrage dynamique du travail ci-dessous.

Testabilité

Il est essentiel de procéder à des tests unitaires exhaustifs de toutes vos sous-classes Source et Sink, en particulier si vous créez vos classes en vue d'exploiter des fonctionnalités avancées telles que le rééquilibrage dynamique du travail de Dataflow. Une erreur mineure lors de la mise en œuvre peut entraîner une corruption ou une perte de données (liée à des enregistrements ignorés ou dupliqués, par exemple) souvent difficile à détecter.

Pour vous aider à tester les sources, le SDK Dataflow fournit la classe SourceTestUtils. SourceTestUtils contient des utilitaires permettant de vérifier automatiquement certaines des propriétés de votre mise en œuvre de BoundedSource. Vous pouvez exploiter SourceTestUtils pour étendre la couverture des tests de votre mise en œuvre à l'aide d'une grande variété d'entrées contenant relativement peu de lignes de code.

Créer une source personnalisée

Pour créer une source de données personnalisée pour votre pipeline, vous devez fournir la logique spécifique au format qui régit la façon dont le service Dataflow lit les données de votre source d'entrée et divise cette source en plusieurs parties pour permettre à plusieurs instances de nœuds de calcul de lire vos données en parallèle. Si vous créez une source de données personnalisée qui lit des données illimitées, vous devez fournir une logique supplémentaire pour gérer le filigrane de votre source et créer des points de contrôle facultatifs.

Pour fournir la logique pour votre source personnalisée, vous devez créer les classes suivantes :

  • Une sous-classe BoundedSource si vous souhaitez lire un ensemble de données limité (par lots), ou une sous-classe UnboundedSource si vous souhaitez lire un ensemble de données illimité (flux). Ces sous-classes décrivent les données que vous souhaitez lire, y compris leur emplacement et leurs paramètres (tels que la quantité de données à lire).
  • Une sous-classe de la classe Source.Reader du SDK Dataflow. Chaque Source doit être associée à un objet Reader qui enregistre tous les états impliqués dans la lecture de cette Source. Il peut s'agir d'opérations de gestion de fichiers, de connexions RPC et d'autres paramètres qui dépendent des exigences spécifiques relatives au format de données que vous souhaitez lire.

    La hiérarchie de la classe Reader reflète la hiérarchie de la Source. Si vous étendez BoundedSource, vous devez fournir une sous-classe BoundedReader associée. Si vous étendez UnboundedSource, vous devez fournir une sous-classe UnboundedReader associée.

Mettre en œuvre la sous-classe Source

Vous devez créer une sous-classe BoundedSource ou UnboundedSource, selon que vos données sont limitées ou non. Dans les deux cas, votre sous-classe Source doit remplacer les méthodes abstraites de la super-classe. Lorsque le service Dataflow utilise votre source de données personnalisée, il exploite ces méthodes pour estimer la taille de votre ensemble de données et le diviser pour la lecture en parallèle.

Votre sous-classe Source doit également gérer les informations de base relatives à votre source de données, telles que l'emplacement. Par exemple, l'exemple de mise en œuvre de Source dans la classe DatastoreIO de Dataflow utilise les arguments suivants : l'hôte host, l'ID d'ensemble de données datasetID et la requête query ayant servi à récupérer les données depuis Datastore.

BoundedSource

BoundedSource représente un ensemble de données limité lisible par le service Dataflow (éventuellement en parallèle). BoundedSource contient un ensemble de méthodes abstraites qui permettent au service de diviser l'ensemble de données pour qu'il puisse être lu par plusieurs nœuds de calcul distants.

Pour mettre en œuvre BoundedSource, votre sous-classe doit remplacer les méthodes abstraites suivantes :

  • splitIntoBundles  : cette méthode permet au service Dataflow de diviser vos données limitées en lots d'une taille donnée.
  • getEstimatedSizeBytes : cette méthode permet au service Dataflow d'estimer la taille totale de vos données (en octets).
  • producesSortedKeys : cette méthode indique au service Dataflow si votre source produit des paires clé/valeur triées ou non. Si votre source ne produit pas de paires clé/valeur, votre mise en œuvre de cette méthode doit renvoyer false.
  • createReader : crée la sous-classe BoundedReader associée à BoundedSource.

Pour découvrir comment mettre en œuvre BoundedSource et les méthodes abstraites requises, consultez l'exemple de mise en œuvre de DatastoreIO du SDK Dataflow.

UnboundedSource

UnboundedSource représente un flux de données illimité lisible par le service Dataflow (éventuellement en parallèle). UnboundedSource contient un ensemble de méthodes abstraites qui permettent au service de lire des données en flux continu de façon parallèle. Ces méthodes comprennent la création de points de contrôle pour la reprise après échec, les ID d'enregistrement afin d'empêcher la duplication des données, et la création de filigranes afin d'estimer l'exhaustivité des données dans les parties en aval de votre pipeline.

Pour mettre en œuvre UnboundedSource, votre sous-classe doit remplacer les méthodes abstraites suivantes :

  • generateInitialSplits : cette méthode permet au service Dataflow de générer une liste d'objets UnboundedSource qui représente le nombre d'instances de sous-flux que Dataflow doit lire en parallèle.
  • getCheckpointMarkCoder : cette méthode permet au service Dataflow d'obtenir l'objet Coder pour les points de contrôle de votre source (le cas échéant).
  • requiresDeduping : cette méthode permet au service Dataflow de déterminer si les données nécessitent la suppression explicite des enregistrements en double. Si cette méthode renvoie la valeur true, le service Dataflow ajoute automatiquement une étape qui supprime les doublons de la sortie de votre source.
  • createReader : crée la sous-classe UnboundedReader associée à UnboundedSource.

Mettre en œuvre la sous-classe Reader

Vous devez créer une sous-classe BoundedReader ou UnboundedReader qui sera renvoyée par la méthode createReader de votre sous-classe source. Le service Dataflow applique les méthodes sur votre objet Reader (qu'il soit limité ou illimité) pour lire votre ensemble de données.

Les sous-classes BoundedReader et UnboundedReader possèdent des interfaces de base semblables, que vous devez définir. D'autres méthodes supplémentaires propres à UnboundedReader doivent également être mises en œuvre pour que vous puissiez utiliser des données illimitées. En outre, il est possible d'employer une méthode facultative si vous souhaitez que BoundedReader tire parti de la fonctionnalité de rééquilibrage dynamique du travail de Dataflow. Il existe aussi certaines différences mineures au niveau de la sémantique des méthodes start() et advance() lors de l'utilisation de UnboundedReader.

Méthodes du lecteur communes à BoundedReader et UnboundedReader

Le service Dataflow emploie les méthodes suivantes pour lire des données à l'aide de BoundedReader ou UnboundedReader :

  • start : initialise l'objet Reader et le fait avancer jusqu'au premier enregistrement à lire. Cette méthode est appelée exactement une fois lorsque Dataflow commence à lire des données. Il est recommandé d'y intégrer les opérations coûteuses nécessaires à l'initialisation.
  • advance : fait avancer le lecteur jusqu'au prochain enregistrement valide. Cette méthode doit renvoyer false s'il n'y a plus d'entrées disponibles. BoundedReader doit cesser la lecture une fois que la méthode advance renvoie "false", mais UnboundedReader peut renvoyer true lors des prochains appels, lorsque de nouvelles données sont disponibles dans votre flux.
  • getCurrent : renvoie l'enregistrement de données à la position actuelle, qui a été lue en dernier par la méthode start ou advance.
  • getCurrentTimestamp : renvoie l'horodatage associé à l'enregistrement de données actuel. Vous ne devez remplacer la méthode getCurrentTimestamp que si votre source lit des données comportant des horodatages intrinsèques. Le service Dataflow exploite cette valeur pour définir l'horodatage intrinsèque associé à chaque élément de la PCollection de sortie obtenue.

Méthodes du lecteur propres à UnboundedReader

En plus de l'interface de base de Reader, UnboundedReader dispose de méthodes supplémentaires qui permettent de gérer les lectures à partir d'une source de données illimitée :

  • getCurrentRecordId : renvoie un identifiant unique associé à l'enregistrement en cours. Le service Dataflow filtre les enregistrements en double à l'aide de ces ID. Si des ID logiques sont présents dans chaque enregistrement de vos données, cette méthode vous permet de les renvoyer. Si ce n'est pas le cas, vous pouvez renvoyer un hachage (d'au moins 128 bits) du contenu de l'enregistrement. (Il n'est pas recommandé d'utiliser la méthode Object.hashCode() de Java, car un hachage de 32 bits ne suffit généralement pas à éviter les conflits.)
  • Remarque : La mise en œuvre de getCurrentRecordId est facultative si votre source utilise un schéma de création de points de contrôle qui identifie chaque enregistrement de manière unique. Toutefois, les ID d'enregistrement peuvent toujours s'avérer utiles si les systèmes en amont qui écrivent des données dans votre source produisent parfois des enregistrements en double pouvant ensuite être lus par cette dernière.

  • getWatermark : renvoie un filigrane fourni par votre lecteur Reader. Le filigrane correspond à la limite inférieure approximative des horodatages associés aux futurs éléments à lire par votre lecteur Reader. Le service Dataflow exploite le filigrane pour estimer l'exhaustivité des données. Les filigranes sont utilisés dans les fonctions de fenêtrage et de déclenchement de Dataflow.
  • getCheckpointMark : cette méthode permet au service Dataflow de créer un point de contrôle dans votre flux de données. Le point de contrôle représente la progression de UnboundedReader, qui peut être utilisée pour la reprise après échec. Des flux de données différents peuvent utiliser différentes méthodes de création de points de contrôle. Certaines sources peuvent exiger la confirmation des enregistrements reçus, alors que d'autres peuvent recourir à la création de points de contrôle positionnels. Vous devez adapter cette méthode au schéma de création de points de contrôle le plus approprié. Par exemple, vous pouvez demander à cette méthode de renvoyer les enregistrements récemment confirmés.
  • Remarque : La méthode getCheckpointMark est facultative. Vous n'avez pas besoin de la mettre en œuvre si vos données ne possèdent pas de points de contrôle significatifs. Toutefois, si vous choisissez de ne pas mettre en œuvre la création de points de contrôle dans votre source, vous risquez de rencontrer des données en double dans votre pipeline ou d'en perdre, selon que votre source de données tente de renvoyer des enregistrements en cas d'erreur ou non.

Utiliser la classe BoundedSource avec le rééquilibrage dynamique du travail

Si votre source fournit des données limitées, vous pouvez utiliser BoundedReader avec la fonctionnalité de rééquilibrage dynamique du travail du service Dataflow en mettant en œuvre la méthode splitAtFraction. Le service Dataflow peut appeler splitAtFraction en même temps que start ou advance sur un lecteur spécifique. Cela permet de diviser les données restantes de votre Source et de les redistribuer à d'autres nœuds de calcul.

Lorsque vous mettez en œuvre splitAtFraction, votre code doit générer un ensemble de divisions mutuellement exclusives qui, une fois combinées, correspondent à la totalité de l'ensemble de données.

Classes de base Source et Reader

Le SDK Dataflow contient des classes de base abstraites qui vous aident à créer les classes Source et Reader. Ces dernières fonctionnent avec les formats de stockage de données courants, tels que les fichiers.

FileBasedSource

Si votre source de données utilise des fichiers, vous pouvez déduire vos classes Source et Reader à partir des classes de base abstraites FileBasedSource et FileBasedReader du SDK Dataflow pour Java. FileBasedSource est une sous-classe de source illimitée qui met en œuvre le code commun aux sources Dataflow interagissant avec les fichiers, y compris :

  • l'expansion des modèles de fichier ;
  • la lecture séquentielle des enregistrements ;
  • les points de division.

XmlSource

Si votre source de données utilise des fichiers au format XML, vous pouvez dériver votre classe Source à partir de la classe de base abstraite XmlSource du SDK Dataflow pour Java. XmlSource étend FileBasedSource et fournit des méthodes supplémentaires pour l'analyse des fichiers XML, telles que la définition des éléments XML qui désignent la racine du fichier et ses enregistrements individuels.

Lire des données à partir d'une source personnalisée

Pour lire des données à partir d'une source personnalisée de votre pipeline, vous devez appliquer la transformation générique Read du SDK et transmettre votre source personnalisée en tant que paramètre à l'aide de l'opération .from :

Java

  MySource source = new MySource(false, file.getPath(), 64, null);
  p.apply("ReadFileData", Read.from(source))

Créer un récepteur personnalisé

Pour créer un récepteur de données personnalisé pour votre pipeline, vous devez fournir la logique spécifique au format qui régit la façon dont le service Dataflow écrit des données à partir des PCollections de votre pipeline dans un récepteur de sortie, tel qu'un répertoire ou un système de fichiers, une table de base de données, etc. Le service Dataflow écrit des lots de données en parallèle à l'aide de plusieurs nœuds de calcul.

Remarque : Dataflow n'est actuellement compatible qu'avec l'écriture de données illimitées dans un récepteur de sortie personnalisé.

Vous fournissez la logique d'écriture en créant les classes suivantes :

  • Une sous-classe de la classe de base abstraite Sink du SDK Dataflow. Sink décrit une ressource ou un emplacement dans lequel votre pipeline peut écrire des données en parallèle. Votre sous-classe Sink peut contenir des champs tels que l'emplacement de la ressource ou du fichier, le nom de la table de base de données, etc.
  • Une sous-classe Sink.WriteOperation. Sink.WriteOperation représente l'état d'une opération d'écriture parallèle dans l'emplacement de sortie décrit dans votre récepteur Sink. Votre sous-classe WriteOperation doit définir les processus d'initialisation et de finalisation de l'écriture parallèle.
  • Une sous-classe Sink.Writer. Sink.Writer écrit un lot d'éléments dans votre récepteur de données désigné, à partir d'une PCollection d'entrée.

Mettre en œuvre la sous-classe Sink

La sous-classe Sink décrit la ressource ou l'emplacement dans lequel votre pipeline écrit sa sortie. Il peut s'agir d'un emplacement de système de fichiers, d'un nom de table de base de données, d'un nom d'ensemble de données, etc. Votre sous-classe Sink doit valider le fait que l'emplacement de sortie est accessible en écriture et créer des opérations WriteOperation qui définissent la façon dont les données sont écrites dans cet emplacement de sortie.

Pour mettre en œuvre Sink, votre sous-classe doit remplacer les méthodes abstraites suivantes :

  • validate : cette méthode garantit que l'emplacement de sortie des données du pipeline est valide et accessible en écriture. validate doit confirmer que le fichier peut être ouvert, que le répertoire de sortie existe, que l'utilisateur dispose d'autorisations d'accès à la table de base de données, etc. Le service Dataflow appelle la méthode validate au moment de la création du pipeline.
  • createWriteOperation : cette méthode crée un objet Sink.WriteOperation qui définit la façon dont les données sont écrites dans l'emplacement de sortie.

Mettre en œuvre la sous-classe WriteOperation

La sous-classe WriteOperation définit la façon dont un lot d'éléments est écrit dans l'emplacement de sortie défini dans votre récepteur Sink. WriteOperation effectue les processus d'initialisation et de finalisation nécessaires à une écriture en parallèle.

Pour mettre en œuvre WriteOperation, votre sous-classe doit remplacer les méthodes abstraites suivantes :

  • initialize : cette méthode effectue toutes les étapes d'initialisation nécessaires avant l'écriture de données dans l'emplacement de sortie. Le service Dataflow appelle cette méthode avant le début de l'écriture. Vous pouvez par exemple exécuter la méthode initialize pour créer un répertoire de sortie temporaire.
  • finalize : cette méthode traite les résultats d'une écriture effectuée par une classe Writer. Votre mise en œuvre de finalize doit nettoyer toutes les écritures échouées ou relancées avec succès, et être capable de localiser toute sortie temporaire ou partielle écrite par des écritures ayant échoué.

    Comme la méthode finalize peut être appelée plusieurs fois en cas d'échec ou de nouvelle tentative, il est recommandé de rendre la mise en œuvre de finalize atomique. Si cette opération n'est pas possible, vous devez rendre votre mise en œuvre de finalize idempotente.
  • createWriter : cette méthode crée un objet Sink.Writer qui écrit un lot de données dans l'emplacement de sortie défini dans votre récepteur Sink.

Mettre en œuvre la sous-classe Writer

La sous-classe Writer met en œuvre la logique qui permet d'écrire un lot d'enregistrements dans l'emplacement de sortie défini dans votre récepteur Sink. Le service Dataflow peut instancier plusieurs instances de votre objet Writer dans différents threads du même nœud de calcul. L'accès aux membres ou méthodes statiques doit donc être sécurisé.

Pour mettre en œuvre Writer, votre sous-classe doit remplacer les méthodes abstraites suivantes :

  • open : cette méthode effectue toutes les étapes d'initialisation nécessaires à l'écriture du lot d'enregistrements, telles que la création d'un fichier temporaire pour l'écriture. Le service Dataflow appelle cette méthode au début de l'écriture et lui transmet un ID de lot unique pour que le lot d'enregistrements puisse être écrit.
  • write : cette méthode écrit un enregistrement unique dans l'emplacement de sortie. Le service Dataflow appelle write pour chaque valeur du lot.
  • close : cette méthode termine l'écriture et ferme toutes les ressources utilisées pour l'écriture du lot. La méthode close doit renvoyer un résultat de rédacteur, qui permettra à l'opération englobante WriteOperation d'identifier les écritures réussies. Le service Dataflow appelle cette méthode à la fin de l'écriture.

Gérer les ID de lot

Lorsque le service appelle Writer.open, il transmet un ID de lot unique qui permet l'écriture des enregistrements. Votre objet Writer doit exploiter cet ID de lot pour garantir que sa sortie n'interférera pas avec celle d'autres instances Writer ayant pu être créées en parallèle. Ce point est particulièrement important, car le service Dataflow peut relancer des écritures plusieurs fois en cas d'échec.

Par exemple, si la sortie de votre récepteur Sink est basée sur un fichier, votre classe Writer peut utiliser l'ID de lot comme suffixe de nom de fichier. Cela permet à votre objet Writer d'écrire ses enregistrements dans un fichier de sortie unique inutilisé par d'autres objets Writer. Vous pouvez ensuite faire en sorte que la méthode close de l'objet Writer renvoie cet emplacement de fichier dans le résultat de l'écriture.

Pour découvrir comment mettre en œuvre Sink, WriteOperation, Writer et les méthodes abstraites requises, consultez l'exemple de mise en œuvre de DatastoreIO du SDK Dataflow.

Classes de base Sink et Writer

Le SDK Dataflow contient des classes de base abstraites qui vous aident à créer les classes Source et Reader. Ces dernières fonctionnent avec les formats de stockage de données courants, tels que les fichiers.

FileBasedSink

Si votre source de données utilise des fichiers, vous pouvez déduire vos classes Sink, WriteOperation et Writer à partir des classes de base abstraites FileBasedSink, FileBasedWriteOperation et FileBasedWriter du SDK Dataflow pour Java. Ces classes mettent en œuvre le code commun aux sources Dataflow interagissant avec les fichiers, y compris :

  • la définition des en-têtes et des pieds de page de fichiers ;
  • l'écriture séquentielle des enregistrements ;
  • la définition du type de sortie MIME.

FileBasedSink et ses sous-classes sont compatibles avec l'écriture de données au sein de fichiers locaux et de fichiers Google Cloud Storage. Pour en savoir plus, consultez l'exemple de mise en œuvre de FileBasedSink (appelé XmlSink) du SDK Dataflow pour Java.

Écrire des données dans un récepteur personnalisé

Pour écrire des données dans un récepteur personnalisé de votre pipeline, vous devez appliquer la transformation générique Write du SDK et transmettre votre récepteur personnalisé en tant que paramètre à l'aide de l'opération .to :

Java

  p.apply("WriteResults", Write.to(new MySink()));
Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.