"Exactement une fois" dans Dataflow

Dataflow prend en charge le traitement de type "exactement une fois" pour les enregistrements. Cette page explique comment Dataflow met en œuvre le traitement de type "exactement une fois" tout en garantissant une faible latence.

Présentation

Les pipelines de traitement par lot utilisent toujours le traitement "exactement une fois". Par défaut, les pipelines de traitement de flux utilisent le traitement "exactement une fois", mais ils peuvent également utiliser le traitement "au moins une fois".

Le traitement de type "exactement une fois" fournit des garanties sur les résultats du traitement des enregistrements, y compris sur les résultats de chaque étape du pipeline. Plus précisément, pour chaque enregistrement qui arrive au pipeline à partir d'une source, ou qui arrive à une étape de pipeline à partir d'une étape précédente, Dataflow garantit les éléments suivants :

  • L'enregistrement est traité et n'est pas perdu.
  • Tous les résultats de traitement qui restent dans le pipeline sont reflétés au maximum une fois.

En d'autres termes, les enregistrements sont traités au moins une fois et les résultats sont validés exactement une fois.

Le traitement "exactement une fois" garantit l'exactitude des résultats, sans enregistrement en double dans la sortie. Dataflow est optimisé pour minimiser la latence tout en conservant une sémantique de type "exactement une fois". Toutefois, le traitement de type "exactement une fois" entraîne toujours des coûts de déduplication. Pour les cas d'utilisation pouvant tolérer des enregistrements en double, vous pouvez souvent réduire les coûts et améliorer la latence en activant le mode "au moins une fois". Pour vous aider à choisir entre le traitement de flux "exactement une fois" et le traitement "au moins une fois", consultez la section Définir le mode de traitement de flux pour le pipeline.

Données tardives

Le traitement de type "exactement une fois" garantit la précision du pipeline : si le pipeline traite un enregistrement, Dataflow s'assure que l'enregistrement est reflété dans la sortie et que l'enregistrement n'est pas dupliqué.

Cependant, dans un pipeline de traitement de flux, le traitement de type "exactement une fois" ne peut pas garantir que les résultats sont complets, car les enregistrements peuvent arriver en retard. Par exemple, supposons que votre pipeline effectue une agrégation sur une fenêtre temporelle, telle que Count. Avec le traitement de type "exactement une fois", le résultat est exact pour les enregistrements qui arrivent dans la fenêtre en temps opportun, mais les enregistrements tardifs peuvent être supprimés.

En règle générale, il n'existe aucun moyen de garantir l'exhaustivité d'un pipeline de traitement de flux, car en théorie, les enregistrements peuvent arriver en retard de façon arbitraire. Dans le cas limitant, il faudrait parfois attendre indéfiniment pour produire un résultat. Plus précisément, Apache Beam vous permet de configurer le seuil de suppression des données tardives et de définir à quel moment émettre les résultats agrégés. Pour en savoir plus, consultez la section Filigranes et données en retard dans la documentation Apache Beam.

Effets secondaires

Il n'est pas garanti que les effets secondaires aient une sémantique de type "exactement une fois". Il est important de noter que cela inclut l'écriture des résultats dans un magasin externe, sauf si le récepteur met également en œuvre la sémantique de type "exactement une fois".

Plus précisément, Dataflow ne garantit pas que chaque enregistrement subit chaque transformation exactement une fois. En raison de nouvelles tentatives ou d'échecs des nœuds de calcul, Dataflow peut appliquer une transformation plusieurs fois sur un même enregistrement, voire même simultanément sur plusieurs nœuds de calcul.

Dans le cadre du traitement de type "exactement une fois", Dataflow déduplique les résultats. Toutefois, si le code d'une transformation a des effets secondaires, ceux-ci peuvent se produire plusieurs fois. Par exemple, si une transformation effectue un appel à un service distant, cet appel peut être effectué plusieurs fois pour le même enregistrement. Les effets secondaires peuvent même entraîner une perte de données dans certaines situations. Par exemple, supposons qu'une transformation lit un fichier pour produire un résultat, puis supprime immédiatement le fichier sans attendre le commit du résultat. Si une erreur se produit lors du commit du résultat, Dataflow relance la transformation, mais celle-ci ne peut plus lire le fichier supprimé.

Journalisation

La sortie du journal de traitement indique que le traitement s'est produit, mais n'indique pas si les données ont été validées. Par conséquent, les fichiers journaux peuvent indiquer que les données ont été traitées plusieurs fois, même si les résultats des données traitées ne sont validés qu'une seule fois pour le stockage persistant. De plus, les journaux ne reflètent pas toujours les données traitées et validées. Les journaux peuvent être supprimés en raison d'un ralentissement ou d'une perte résultant d'autres problèmes du service de journalisation.

Traitement de flux "exactement une fois"

Cette section explique comment Dataflow met en œuvre le traitement de type "exactement une fois" pour les tâches de traitement par flux, y compris la manière dont Dataflow gère des complexités telles que le traitement non déterministe, les données tardives et le code personnalisé.

Brassage de flux Dataflow

Les tâches Dataflow de traitement par flux s'exécutent sur de nombreux nœuds de calcul différents en parallèle en attribuant des plages de travail à chaque nœud de calcul. Bien que les attributions puissent changer au fil du temps en réponse à des échecs de nœuds de calcul, à l'autoscaling ou à d'autres événements, après chaque transformation GroupByKey, tous les enregistrements ayant la même clé sont traités sur le même nœud de calcul. La transformation GroupByKey est souvent utilisée par les transformations composites, telles que Count, FileIO, etc. Pour garantir que les enregistrements d'une clé donnée se retrouvent sur le même nœud de calcul, les nœuds de calcul Dataflow brassent les données entre eux à l'aide d'appels de procédure à distance (RPC).

Afin d'éviter la perte des enregistrements lors du brassage, Dataflow utilise la sauvegarde en amont. Avec la sauvegarde en amont, le nœud de calcul qui envoie les enregistrements relance les RPC jusqu'à ce qu'il reçoive un accusé de réception positif. Les effets secondaires du traitement de l'enregistrement sont validés sur le stockage persistant en aval. Si le nœud de calcul qui envoie les enregistrements devient indisponible, Dataflow continue de relancer les RPC, ce qui garantit que chaque enregistrement est distribué au moins une fois.

Étant donné que ces tentatives peuvent créer des doublons, chaque message est étiqueté avec un ID unique. Chaque récepteur stocke un catalogue de tous les ID qui ont déjà été vus et traités. Lorsqu'un enregistrement est reçu, Dataflow recherche son ID dans le catalogue. Si cet ID est trouvé, l'enregistrement a déjà été reçu et validé, l'enregistrement est donc supprimé en tant que doublon. Pour garantir la stabilité des ID d'enregistrement, chaque sortie de chaque étape est pointée vers l'espace de stockage. Par conséquent, si le même message est envoyé plusieurs fois en raison d'appels RPC répétés, le message n'est validé pour le stockage qu'une seule fois.

Garantir une faible latence

Pour que le traitement de type "exactement une fois" soit viable, les E/S doivent être réduits, en particulier en empêchant les E/S sur chaque enregistrement. Pour atteindre cet objectif, Dataflow utilise des filtres Bloom et la récupération de mémoire.

Filtres Bloom

Les filtres Bloom sont des structures de données compactes qui permettent de vérifier rapidement les membres. Dans Dataflow, chaque nœud de calcul conserve un filtre Bloom de chaque ID qu'il voit. Lorsqu'un nouvel ID d'enregistrement arrive, le nœud de calcul recherche l'ID dans le filtre. Si le filtre renvoie la valeur "false", cet enregistrement n'est pas un doublon et le nœud de calcul ne recherche pas l'ID dans l'espace de stockage stable.

Dataflow conserve un ensemble de filtres Bloom progressifs regroupés par heure. Lorsqu'un enregistrement arrive, Dataflow sélectionne le filtre approprié à vérifier en fonction de l'horodatage du système. Cette étape empêche la saturation des filtres Bloom lors de la récupération de mémoire et limite également la quantité de données à analyser au démarrage.

Récupération de mémoire

Pour éviter de remplir le stockage avec des ID d'enregistrement, Dataflow utilise la récupération de mémoire pour supprimer les anciens enregistrements. Dataflow utilise l'horodatage du système pour calculer un filigrane de récupération de mémoire.

Ce filigrane est basé sur le temps physique passé à attendre une étape donnée. Par conséquent, il fournit également des informations sur les parties du pipeline lentes. Ces métadonnées constituent la base de la métrique de latence du système indiquée dans l'interface de surveillance de Dataflow.

Si un enregistrement arrive avec un horodatage antérieur au filigrane et si des ID ont déjà été récupérés de la mémoire, l'enregistrement est ignoré. Comme le faible filigrane qui déclenche la récupération de mémoire n'avance pas tant que les livraisons d'enregistrements n'ont pas été confirmées, ces enregistrements tardifs sont en double.

Sources non déterministes

Dataflow utilise le SDK Apache Beam pour lire des données dans des pipelines. Si le traitement échoue, Dataflow peut relancer des lectures à partir d'une source. Dans ce cas, Dataflow doit s'assurer que chaque enregistrement unique produit par une source est enregistré une seule fois. Pour les sources déterministes, telles que Pub/Sub Lite ou Kafka, les enregistrements sont lus en fonction d'un décalage enregistré, ce qui réduit le besoin pour cette étape.

Étant donné que Dataflow ne peut pas attribuer automatiquement les ID d'enregistrement, les sources non déterministes doivent indiquer à Dataflow les ID d'enregistrement afin d'éviter la duplication. Lorsqu'une source fournit des ID uniques pour chaque enregistrement, le connecteur utilise un brassage dans le pipeline pour supprimer les doublons. Les enregistrements ayant le même ID sont filtrés. Pour obtenir un exemple de mise en œuvre du traitement de type "exactement une fois" dans Dataflow en utilisant Pub/Sub comme source, consultez la section Déduplication efficace de la page "Streaming avec Pub/Sub".

Lorsque vous exécutez des conteneurs DoFn personnalisés dans le cadre de votre pipeline, Dataflow ne garantit pas que ce code n'est exécuté qu'une seule fois par enregistrement. Pour garantir au moins un traitement en cas de défaillance d'un nœud de calcul, Dataflow peut exécuter un enregistrement donné via une transformation plusieurs fois ou exécuter le même enregistrement simultanément sur plusieurs nœuds de calcul. Si vous incluez dans votre pipeline du code qui contacte un service externe, par exemple, les actions peuvent être exécutées plusieurs fois pour un enregistrement donné.

Pour rendre le traitement non déterministe efficacement déterministe, utilisez les points de contrôle. Lorsque vous utilisez des points de contrôle, chaque sortie d'une transformation est redirigée vers un stockage stable avec son ID unique avant d'être transmise à l'étape suivante. Les nouvelles tentatives de distribution de Dataflow renvoient le résultat qui a fait l'objet d'un point de contrôle. Bien que votre code puisse être exécuté plusieurs fois, Dataflow s'assure que le résultat d'une seule de ces exécutions est stocké. Dataflow utilise un magasin cohérent qui empêche l'écriture de doublons dans un espace de stockage stable.

Diffusion des résultats "exactement une fois"

Le SDK Apache Beam inclut des récepteurs intégrés conçus pour garantir qu'ils ne produisent pas de doublons. Dans la mesure du possible, utilisez l'un de ces récepteurs intégrés.

Si vous devez écrire votre propre récepteur, la meilleure approche consiste à rendre votre objet fonction idempotent afin qu'il puisse faire l'objet de nouvelles tentatives aussi souvent que nécessaire sans provoquer d'effets secondaires inattendus. Néanmoins, souvent, un composant de la transformation qui met en œuvre la fonctionnalité de récepteur est non déterministe et peut changer si une nouvelle tentative est effectuée.

Par exemple, dans une agrégation fenêtrée, l'ensemble d'enregistrements de la fenêtre peut être non déterministe. Plus précisément, la fenêtre peut tenter de se déclencher avec les éléments e0, e1 et e2. Le nœud de calcul peut planter avant de valider le traitement de la fenêtre, mais pas avant que ces éléments ne soient envoyés en tant qu'effet secondaire. Lorsque le nœud de calcul redémarre, la fenêtre se déclenche à nouveau et un élément e3 tardif arrive. Étant donné que cet élément arrive avant le commit de la fenêtre, il n'est pas comptabilisé en tant que donnée tardive. Par conséquent, DoFn est appelé à nouveau avec les éléments e0, e1, e2, e3. Ces éléments sont ensuite envoyés à l'opération d'effet secondaire. L'idempotence n'aide pas ce scénario, car différents jeux d'enregistrements logiques sont envoyés à chaque fois.

Pour gérer la non-déterminité dans Dataflow, utilisez la transformation Reshuffle intégrée. Lorsque Dataflow brasse des données, il les écrit de manière durable de sorte que tous les éléments non déterministes sont stables si les opérations sont relancées après le brassage. L'utilisation de la transformation Reshuffle garantit qu'une seule version de sortie d'une DoFn peut passer au-delà d'une limite de brassage. Le modèle suivant garantit que l'opération d'effet secondaire reçoit toujours un enregistrement déterministe à générer:

c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
 .apply(GroupByKey.<..>.create())
 .apply(new PrepareOutputData())
 .apply(Reshuffle.<..>of())
 .apply(WriteToSideEffect());

Pour vous assurer que l'exécuteur Dataflow sait que les éléments doivent être stables avant d'exécuter une opération DoFn, ajoutez l'annotation RequiresStableInput au DoFn.

En savoir plus